diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index f56fadc21dc2b..e2a025f5130ef 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -985,6 +985,13 @@ repos:
files: \.py$
exclude: ^.*/.*_vendor/
additional_dependencies: ['rich>=12.4.4']
+ - id: check-compat-cache-on-methods
+ name: Check that compat cache do not use on class methods
+ entry: ./scripts/ci/pre_commit/compat_cache_on_methods.py
+ language: python
+ pass_filenames: true
+ files: ^airflow/.*\.py$
+ exclude: ^.*/.*_vendor/
- id: lint-chart-schema
name: Lint chart/values.schema.json file
entry: ./scripts/ci/pre_commit/chart_schema.py
diff --git a/airflow/compat/functools.pyi b/airflow/compat/functools.pyi
deleted file mode 100644
index 7d1bef5939e6e..0000000000000
--- a/airflow/compat/functools.pyi
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# This stub exists to work around false linter errors due to python/mypy#10408.
-# TODO: Remove this file after the upstream fix is available in our toolchain.
-from __future__ import annotations
-
-from typing import TypeVar
-
-T = TypeVar("T")
-
-def cache(f: T) -> T: ...
diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index c78eeb9ee7d6f..fa59d6cfc948d 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -23,9 +23,9 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Collection, Iterable, Iterator, Sequence
+import methodtools
from sqlalchemy import select
-from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models.expandinput import NotFullyPopulated
@@ -493,7 +493,7 @@ def get_extra_links(self, ti: TaskInstance, link_name: str) -> str | None:
return link.get_link(self.unmap(None), ti.dag_run.logical_date) # type: ignore[misc]
return link.get_link(self.unmap(None), ti_key=ti.key)
- @cache
+ @methodtools.lru_cache(maxsize=None)
def get_parse_time_mapped_ti_count(self) -> int:
"""
Return the number of mapped task instances that can be created on DAG run creation.
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 9e1200c0494c8..c02f7470edd5d 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -24,8 +24,8 @@
from typing import TYPE_CHECKING, Any, ClassVar, Collection, Iterable, Iterator, Mapping, Sequence, Union
import attr
+import methodtools
-from airflow.compat.functools import cache
from airflow.exceptions import AirflowException, UnmappableOperator
from airflow.models.abstractoperator import (
DEFAULT_EXECUTOR,
@@ -335,8 +335,8 @@ def __attrs_post_init__(self):
f"{self.task_id!r}."
)
+ @methodtools.lru_cache(maxsize=None)
@classmethod
- @cache
def get_serialized_fields(cls):
# Not using 'cls' here since we only want to serialize base fields.
return frozenset(attr.fields_dict(MappedOperator)) - {
@@ -352,8 +352,8 @@ def get_serialized_fields(cls):
"_on_failure_fail_dagrun",
}
+ @methodtools.lru_cache(maxsize=None)
@staticmethod
- @cache
def deps_for(operator_class: type[BaseOperator]) -> frozenset[BaseTIDep]:
operator_deps = operator_class.deps
if not isinstance(operator_deps, collections.abc.Set):
@@ -789,7 +789,7 @@ def iter_mapped_dependencies(self) -> Iterator[Operator]:
for operator, _ in XComArg.iter_xcom_references(self._get_specified_expand_input()):
yield operator
- @cache
+ @methodtools.lru_cache(maxsize=None)
def get_parse_time_mapped_ti_count(self) -> int:
current_count = self._get_specified_expand_input().get_parse_time_mapped_ti_count()
try:
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 5f79600c47d66..50ce7b4089427 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -25,9 +25,9 @@
import weakref
from typing import TYPE_CHECKING, Any, Generator, Iterator, Sequence
+import methodtools
import re2
-from airflow.compat.functools import cache
from airflow.exceptions import (
AirflowDagCycleException,
AirflowException,
@@ -586,7 +586,7 @@ def iter_mapped_dependencies(self) -> Iterator[Operator]:
for op, _ in XComArg.iter_xcom_references(self._expand_input):
yield op
- @cache
+ @methodtools.lru_cache(maxsize=None)
def get_parse_time_mapped_ti_count(self) -> int:
"""
Return the Number of instances a task in this group should be mapped to, when a DAG run is created.
diff --git a/airflow/utils/weight_rule.py b/airflow/utils/weight_rule.py
index f65f2fa77e1af..a63358b0322ce 100644
--- a/airflow/utils/weight_rule.py
+++ b/airflow/utils/weight_rule.py
@@ -19,7 +19,7 @@
from enum import Enum
-from airflow.compat.functools import cache
+import methodtools
class WeightRule(str, Enum):
@@ -34,8 +34,8 @@ def is_valid(cls, weight_rule: str) -> bool:
"""Check if weight rule is valid."""
return weight_rule in cls.all_weight_rules()
+ @methodtools.lru_cache(maxsize=None)
@classmethod
- @cache
def all_weight_rules(cls) -> set[str]:
"""Return all weight rules."""
return set(cls.__members__.values())
diff --git a/contributing-docs/08_static_code_checks.rst b/contributing-docs/08_static_code_checks.rst
index c7be51b6a78fe..0469ee31ead3f 100644
--- a/contributing-docs/08_static_code_checks.rst
+++ b/contributing-docs/08_static_code_checks.rst
@@ -146,6 +146,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-cncf-k8s-only-for-executors | Check cncf.kubernetes imports used for executors only | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
+| check-compat-cache-on-methods | Check that compat cache do not use on class methods | |
++-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-core-deprecation-classes | Verify usage of Airflow deprecation classes in core | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-daysago-import-from-utils | Make sure days_ago is imported from airflow.utils.dates | |
diff --git a/dev/breeze/doc/images/output_static-checks.svg b/dev/breeze/doc/images/output_static-checks.svg
index a709f8070c9d9..2b5f586c73cac 100644
--- a/dev/breeze/doc/images/output_static-checks.svg
+++ b/dev/breeze/doc/images/output_static-checks.svg
@@ -322,19 +322,19 @@
│check-base-operator-partial-arguments | check-base-operator-usage | │
│check-boring-cyborg-configuration | check-breeze-top-dependencies-limited | │
│check-builtin-literals | check-changelog-has-no-duplicates | │
-│check-cncf-k8s-only-for-executors | check-core-deprecation-classes | │
-│check-daysago-import-from-utils | check-decorated-operator-implements-custom-name│
-│| check-deferrable-default-value | check-docstring-param-types | │
-│check-example-dags-urls | check-executables-have-shebangs | │
-│check-extra-packages-references | check-extras-order | check-fab-migrations | │
-│check-for-inclusive-language | check-google-re2-as-dependency | │
-│check-hatch-build-order | check-hooks-apply | check-incorrect-use-of-LoggingMixin│
-│| check-init-decorator-arguments | check-integrations-list-consistent | │
-│check-lazy-logging | check-links-to-example-dags-do-not-use-hardcoded-versions | │
-│check-merge-conflict | check-newsfragments-are-valid | │
-│check-no-airflow-deprecation-in-providers | check-no-providers-in-core-examples |│
-│check-only-new-session-with-provide-session | │
-│check-persist-credentials-disabled-in-github-workflows | │
+│check-cncf-k8s-only-for-executors | check-compat-cache-on-methods | │
+│check-core-deprecation-classes | check-daysago-import-from-utils | │
+│check-decorated-operator-implements-custom-name | check-deferrable-default-value │
+│| check-docstring-param-types | check-example-dags-urls | │
+│check-executables-have-shebangs | check-extra-packages-references | │
+│check-extras-order | check-fab-migrations | check-for-inclusive-language | │
+│check-google-re2-as-dependency | check-hatch-build-order | check-hooks-apply | │
+│check-incorrect-use-of-LoggingMixin | check-init-decorator-arguments | │
+│check-integrations-list-consistent | check-lazy-logging | │
+│check-links-to-example-dags-do-not-use-hardcoded-versions | check-merge-conflict │
+│| check-newsfragments-are-valid | check-no-airflow-deprecation-in-providers | │
+│check-no-providers-in-core-examples | check-only-new-session-with-provide-session│
+│| check-persist-credentials-disabled-in-github-workflows | │
│check-pre-commit-information-consistent | check-provide-create-sessions-imports |│
│check-provider-docs-valid | check-provider-yaml-valid | │
│check-providers-init-file-missing | check-providers-subpackages-init-file-exist |│
diff --git a/dev/breeze/doc/images/output_static-checks.txt b/dev/breeze/doc/images/output_static-checks.txt
index 10228461e0c29..37736eb0039b0 100644
--- a/dev/breeze/doc/images/output_static-checks.txt
+++ b/dev/breeze/doc/images/output_static-checks.txt
@@ -1 +1 @@
-4dc5653769bd03c4d07f6d998cdcb679
+74e4402e7abd79d80bca112783891cb1
diff --git a/dev/breeze/src/airflow_breeze/pre_commit_ids.py b/dev/breeze/src/airflow_breeze/pre_commit_ids.py
index 867f16e0a9252..caa51ca62c5d2 100644
--- a/dev/breeze/src/airflow_breeze/pre_commit_ids.py
+++ b/dev/breeze/src/airflow_breeze/pre_commit_ids.py
@@ -38,6 +38,7 @@
"check-builtin-literals",
"check-changelog-has-no-duplicates",
"check-cncf-k8s-only-for-executors",
+ "check-compat-cache-on-methods",
"check-core-deprecation-classes",
"check-daysago-import-from-utils",
"check-decorated-operator-implements-custom-name",
diff --git a/hatch_build.py b/hatch_build.py
index 4835c5af824ee..2b88b93591732 100644
--- a/hatch_build.py
+++ b/hatch_build.py
@@ -462,6 +462,7 @@
"markupsafe>=1.1.1",
"marshmallow-oneofschema>=2.0.1",
"mdit-py-plugins>=0.3.0",
+ "methodtools>=0.4.7",
"opentelemetry-api>=1.15.0",
"opentelemetry-exporter-otlp",
"packaging>=14.0",
diff --git a/pyproject.toml b/pyproject.toml
index 39ee0703cebe3..bfee8d627dfd5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -284,9 +284,10 @@ extend-select = [
"D419",
"PGH004", # Use specific rule codes when using noqa
"PGH005", # Invalid unittest.mock.Mock methods/attributes/properties
- "B006", # Checks for uses of mutable objects as function argument defaults.
"S101", # Checks use `assert` outside the test cases, test cases should be added into the exclusions
"B004", # Checks for use of hasattr(x, "__call__") and replaces it with callable(x)
+ "B006", # Checks for uses of mutable objects as function argument defaults.
+ "B019", # Use of functools.lru_cache or functools.cache on methods can lead to memory leaks
]
ignore = [
"D203",
diff --git a/scripts/ci/pre_commit/compat_cache_on_methods.py b/scripts/ci/pre_commit/compat_cache_on_methods.py
new file mode 100755
index 0000000000000..5fee74ff2a4ac
--- /dev/null
+++ b/scripts/ci/pre_commit/compat_cache_on_methods.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import ast
+import pathlib
+import sys
+
+COMPAT_MODULE = "airflow.compat.functools"
+
+
+def check_test_file(file: str) -> int:
+ node = ast.parse(pathlib.Path(file).read_text("utf-8"), file)
+ if not (classes := [c for c in node.body if isinstance(c, ast.ClassDef)]):
+ # Exit early if module doesn't contain any classes
+ return 0
+
+ compat_cache_aliases = []
+ for stmt in node.body:
+ if not isinstance(stmt, ast.ImportFrom) or stmt.module != COMPAT_MODULE:
+ continue
+ for alias in stmt.names:
+ if "cache" in alias.name:
+ compat_cache_aliases.append(alias.asname or alias.name)
+ if not compat_cache_aliases:
+ # Exit early in case if there are no imports from `airflow.compat.functools.cache`
+ return 0
+
+ found = 0
+ for klass in classes:
+ for cls_stmt in klass.body:
+ if not isinstance(cls_stmt, ast.FunctionDef) or not cls_stmt.decorator_list:
+ continue
+ for decorator in cls_stmt.decorator_list:
+ if (isinstance(decorator, ast.Name) and decorator.id in compat_cache_aliases) or (
+ isinstance(decorator, ast.Attribute) and decorator.attr in compat_cache_aliases
+ ):
+ found += 1
+ prefix = f"{file}:{decorator.lineno}:"
+ print(f"{prefix} Use of `{COMPAT_MODULE}.cache` on methods can lead to memory leaks")
+
+ return found
+
+
+def main(*args: str) -> int:
+ errors = sum(check_test_file(file) for file in args[1:])
+ if not errors:
+ return 0
+ print(f"Found {errors} error{'s' if errors > 1 else ''}.")
+ return 1
+
+
+if __name__ == "__main__":
+ sys.exit(main(*sys.argv))