diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 8d2cf78234994..4a2418a7f53db 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -656,15 +656,16 @@ def _refresh_dag_bundles(self): bundle.initialize() # TODO: AIP-66 test to make sure we get a fresh record from the db and it's not cached with create_session() as session: - bundle_model = session.get(DagBundleModel, bundle.name) + bundle_model: DagBundleModel = session.get(DagBundleModel, bundle.name) elapsed_time_since_refresh = ( now - (bundle_model.last_refreshed or timezone.utc_epoch()) ).total_seconds() - current_version = bundle.get_current_version() + pre_refresh_version = bundle.get_current_version() + previously_seen = bundle.name in self._bundle_versions if ( elapsed_time_since_refresh < bundle.refresh_interval - and bundle_model.latest_version == current_version - and bundle.name in self._bundle_versions + and bundle_model.version == pre_refresh_version + and previously_seen ): self.log.info("Not time to refresh %s", bundle.name) continue @@ -677,17 +678,20 @@ def _refresh_dag_bundles(self): bundle_model.last_refreshed = now - new_version = bundle.get_current_version() + version_after_refresh = bundle.get_current_version() if bundle.supports_versioning: - # We can short-circuit the rest of the refresh if the version hasn't changed - # and we've already fully "refreshed" this bundle before in this dag processor. - if current_version == new_version and bundle.name in self._bundle_versions: + # We can short-circuit the rest of this if (1) bundle was seen before by + # this dag processor and (2) the version of the bundle did not change + # after refreshing it + if previously_seen and pre_refresh_version == version_after_refresh: self.log.debug("Bundle %s version not changed after refresh", bundle.name) continue - bundle_model.latest_version = new_version + bundle_model.version = version_after_refresh - self.log.info("Version changed for %s, new version: %s", bundle.name, new_version) + self.log.info( + "Version changed for %s, new version: %s", bundle.name, version_after_refresh + ) bundle_file_paths = self._find_files_in_bundle(bundle) diff --git a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py index 2a8a7a6d99e9b..054e6eb03c7f2 100644 --- a/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py +++ b/airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py @@ -43,7 +43,7 @@ def upgrade(): "dag_bundle", sa.Column("name", sa.String(length=250), nullable=False), sa.Column("active", sa.Boolean(), nullable=True), - sa.Column("latest_version", sa.String(length=200), nullable=True), + sa.Column("version", sa.String(length=200), nullable=True), sa.Column("last_refreshed", UtcDateTime(timezone=True), nullable=True), sa.PrimaryKeyConstraint("name", name=op.f("dag_bundle_pkey")), ) diff --git a/airflow/models/dagbundle.py b/airflow/models/dagbundle.py index 08429db0b0bcb..b07e84f3bee9c 100644 --- a/airflow/models/dagbundle.py +++ b/airflow/models/dagbundle.py @@ -29,14 +29,14 @@ class DagBundleModel(Base): We track the following information about each bundle, as it can be useful for informational purposes and for debugging: - active: Is the bundle currently found in configuration? - - latest_version: The latest version Airflow has seen for the bundle. + - version: The latest version Airflow has seen for the bundle. - last_refreshed: When the bundle was last refreshed. """ __tablename__ = "dag_bundle" name = Column(StringID(), primary_key=True) active = Column(Boolean, default=True) - latest_version = Column(String(200), nullable=True) + version = Column(String(200), nullable=True) last_refreshed = Column(UtcDateTime, nullable=True) def __init__(self, *, name: str): diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 99c908819ccf1..5626cf9708b0b 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -a877009126cad78bdd6336ac298e42d76dbb29dc88d5ecb9e5344f95dfe9c2b7 \ No newline at end of file +cb858681fdc7a596db20c1c5dbf93812fd011a6df1e0b5322a49a51c8476bb93 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 198c05c1147c4..3fa6369924caa 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1410,9 +1410,9 @@ [TIMESTAMP] -latest_version - - [VARCHAR(200)] +version + + [VARCHAR(200)] diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 1f10ed3eb1b67..42e975127797b 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -888,7 +888,7 @@ def _update_bundletwo_version(): # will believe another processor had seen a new version with create_session() as session: bundletwo_model = session.get(DagBundleModel, "bundletwo") - bundletwo_model.latest_version = "123" + bundletwo_model.version = "123" bundletwo.refresh.side_effect = _update_bundletwo_version manager = DagFileProcessorManager(max_runs=2) @@ -922,7 +922,7 @@ def test_bundles_versions_are_stored(self): with create_session() as session: model = session.get(DagBundleModel, "bundleone") - assert model.latest_version == "123" + assert model.version == "123" class TestDagFileProcessorAgent: