Skip to content

Commit

Permalink
Rename latest_version to version in bundle orm / code clarification (a…
Browse files Browse the repository at this point in the history
…pache#45720)

1. did a little work to try to make the code a little clearer in the dag processor where we compare versions.  Instead of current and new, which is confusing, I call it pre-refresh and after-refresh, essentially, which makes the logic a little more intelligible.  Also I provide a var `was seen` to show the intention of the `name in list` check.

2. I propose (and do so here) renaming `latest_version` to `version` in bundle for reasons similar to apache#45719.  I think it makes sense to think of the orm object as _itself_ representing the latest or current or last refreshed version. So the latest version cannot itself _have_ a latest version -- the latest version just has a version.  That's sorta conceptually the issue.  In any event, by saying less, we can let the docs explain the nuance.

---------

Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
dstandish and jedcunningham authored Jan 16, 2025
1 parent 439f7b1 commit 060eeb7
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 19 deletions.
24 changes: 14 additions & 10 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,15 +656,16 @@ def _refresh_dag_bundles(self):
bundle.initialize()
# TODO: AIP-66 test to make sure we get a fresh record from the db and it's not cached
with create_session() as session:
bundle_model = session.get(DagBundleModel, bundle.name)
bundle_model: DagBundleModel = session.get(DagBundleModel, bundle.name)
elapsed_time_since_refresh = (
now - (bundle_model.last_refreshed or timezone.utc_epoch())
).total_seconds()
current_version = bundle.get_current_version()
pre_refresh_version = bundle.get_current_version()
previously_seen = bundle.name in self._bundle_versions
if (
elapsed_time_since_refresh < bundle.refresh_interval
and bundle_model.latest_version == current_version
and bundle.name in self._bundle_versions
and bundle_model.version == pre_refresh_version
and previously_seen
):
self.log.info("Not time to refresh %s", bundle.name)
continue
Expand All @@ -677,17 +678,20 @@ def _refresh_dag_bundles(self):

bundle_model.last_refreshed = now

new_version = bundle.get_current_version()
version_after_refresh = bundle.get_current_version()
if bundle.supports_versioning:
# We can short-circuit the rest of the refresh if the version hasn't changed
# and we've already fully "refreshed" this bundle before in this dag processor.
if current_version == new_version and bundle.name in self._bundle_versions:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug("Bundle %s version not changed after refresh", bundle.name)
continue

bundle_model.latest_version = new_version
bundle_model.version = version_after_refresh

self.log.info("Version changed for %s, new version: %s", bundle.name, new_version)
self.log.info(
"Version changed for %s, new version: %s", bundle.name, version_after_refresh
)

bundle_file_paths = self._find_files_in_bundle(bundle)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def upgrade():
"dag_bundle",
sa.Column("name", sa.String(length=250), nullable=False),
sa.Column("active", sa.Boolean(), nullable=True),
sa.Column("latest_version", sa.String(length=200), nullable=True),
sa.Column("version", sa.String(length=200), nullable=True),
sa.Column("last_refreshed", UtcDateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("name", name=op.f("dag_bundle_pkey")),
)
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagbundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class DagBundleModel(Base):
We track the following information about each bundle, as it can be useful for
informational purposes and for debugging:
- active: Is the bundle currently found in configuration?
- latest_version: The latest version Airflow has seen for the bundle.
- version: The latest version Airflow has seen for the bundle.
- last_refreshed: When the bundle was last refreshed.
"""

__tablename__ = "dag_bundle"
name = Column(StringID(), primary_key=True)
active = Column(Boolean, default=True)
latest_version = Column(String(200), nullable=True)
version = Column(String(200), nullable=True)
last_refreshed = Column(UtcDateTime, nullable=True)

def __init__(self, *, name: str):
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
a877009126cad78bdd6336ac298e42d76dbb29dc88d5ecb9e5344f95dfe9c2b7
cb858681fdc7a596db20c1c5dbf93812fd011a6df1e0b5322a49a51c8476bb93
6 changes: 3 additions & 3 deletions docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ def _update_bundletwo_version():
# will believe another processor had seen a new version
with create_session() as session:
bundletwo_model = session.get(DagBundleModel, "bundletwo")
bundletwo_model.latest_version = "123"
bundletwo_model.version = "123"

bundletwo.refresh.side_effect = _update_bundletwo_version
manager = DagFileProcessorManager(max_runs=2)
Expand Down Expand Up @@ -922,7 +922,7 @@ def test_bundles_versions_are_stored(self):

with create_session() as session:
model = session.get(DagBundleModel, "bundleone")
assert model.latest_version == "123"
assert model.version == "123"


class TestDagFileProcessorAgent:
Expand Down

0 comments on commit 060eeb7

Please sign in to comment.