From 163042fd3a73b8290f2402c632fa98b1a8ab36ae Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Fri, 6 Dec 2024 19:43:51 +0100 Subject: [PATCH] Remove Provider Deprecations in Postgres (#44705) * Remove Provider Deprecations in Postgres * Add spelling wordlist * Remove docs from removed operator --- .../postgres_operator_howto_guide.rst | 4 +- docs/spelling_wordlist.txt | 1 + .../airflow/providers/postgres/CHANGELOG.rst | 15 ++ .../providers/postgres/hooks/postgres.py | 33 --- .../providers/postgres/operators/__init__.py | 17 -- .../providers/postgres/operators/postgres.py | 88 ------- .../airflow/providers/postgres/provider.yaml | 5 - .../tests/postgres/hooks/test_postgres.py | 10 - .../tests/postgres/operators/__init__.py | 17 -- .../tests/postgres/operators/test_postgres.py | 231 ------------------ 10 files changed, 18 insertions(+), 403 deletions(-) delete mode 100644 providers/src/airflow/providers/postgres/operators/__init__.py delete mode 100644 providers/src/airflow/providers/postgres/operators/postgres.py delete mode 100644 providers/tests/postgres/operators/__init__.py delete mode 100644 providers/tests/postgres/operators/test_postgres.py diff --git a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst index d1b20dcea172f..da7f8252c0e2f 100644 --- a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst +++ b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst @@ -31,8 +31,8 @@ A task defined or implemented by a operator is a unit of work in your data pipel The purpose of this guide is to define tasks involving interactions with a PostgreSQL database with the :class:`~airflow.providers.common.sql.operators.SQLExecuteQueryOperator`. -.. warning:: - Previously, PostgresOperator was used to perform this kind of operation. But at the moment PostgresOperator is deprecated and will be removed in future versions of the provider. Please consider to switch to SQLExecuteQueryOperator as soon as possible. +.. note:: + Previously, PostgresOperator was used to perform this kind of operation. After deprecation this has been removed. Please use SQLExecuteQueryOperator instead. Common Database Operations with SQLExecuteQueryOperator ------------------------------------------------------- diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 69656b6d08689..cee69fcf032cd 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -684,6 +684,7 @@ getframe getint GetPartitions getsource +getters gevent GH GiB diff --git a/providers/src/airflow/providers/postgres/CHANGELOG.rst b/providers/src/airflow/providers/postgres/CHANGELOG.rst index 6668e60ca5232..26715e47860ac 100644 --- a/providers/src/airflow/providers/postgres/CHANGELOG.rst +++ b/providers/src/airflow/providers/postgres/CHANGELOG.rst @@ -27,6 +27,21 @@ Changelog --------- +main +.... + +Breaking changes +~~~~~~~~~~~~~~~~ + +.. warning:: + All deprecated classes, parameters and features have been removed from the Postgres provider package. + The following breaking changes were introduced: + + * Hooks + * The ``schema`` arg has been renamed to ``database`` as it contained the database name. Deprecated parameters, getters and setters have been removed. Please use ``database`` to set the database name. + * Operators + * Remove ``airflow.providers.postgres.operators.postgres.PostgresOperator``. Please use ``airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`` instead. + 5.14.0 ...... diff --git a/providers/src/airflow/providers/postgres/hooks/postgres.py b/providers/src/airflow/providers/postgres/hooks/postgres.py index f4a4c4e60e8f9..f5dcfe2df49af 100644 --- a/providers/src/airflow/providers/postgres/hooks/postgres.py +++ b/providers/src/airflow/providers/postgres/hooks/postgres.py @@ -18,7 +18,6 @@ from __future__ import annotations import os -import warnings from collections.abc import Iterable from contextlib import closing from copy import deepcopy @@ -27,11 +26,9 @@ import psycopg2 import psycopg2.extensions import psycopg2.extras -from deprecated import deprecated from psycopg2.extras import DictCursor, NamedTupleCursor, RealDictCursor from sqlalchemy.engine import URL -from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.common.sql.hooks.sql import DbApiHook if TYPE_CHECKING: @@ -92,42 +89,12 @@ class PostgresHook(DbApiHook): def __init__( self, *args, options: str | None = None, enable_log_db_messages: bool = False, **kwargs ) -> None: - if "schema" in kwargs: - warnings.warn( - 'The "schema" arg has been renamed to "database" as it contained the database name.' - 'Please use "database" to set the database name.', - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - kwargs["database"] = kwargs["schema"] super().__init__(*args, **kwargs) self.conn: connection = None self.database: str | None = kwargs.pop("database", None) self.options = options self.enable_log_db_messages = enable_log_db_messages - @property - @deprecated( - reason=( - 'The "schema" variable has been renamed to "database" as it contained the database name.' - 'Please use "database" to get the database name.' - ), - category=AirflowProviderDeprecationWarning, - ) - def schema(self): - return self.database - - @schema.setter - @deprecated( - reason=( - 'The "schema" variable has been renamed to "database" as it contained the database name.' - 'Please use "database" to set the database name.' - ), - category=AirflowProviderDeprecationWarning, - ) - def schema(self, value): - self.database = value - @property def sqlalchemy_url(self) -> URL: conn = self.get_connection(self.get_conn_id()) diff --git a/providers/src/airflow/providers/postgres/operators/__init__.py b/providers/src/airflow/providers/postgres/operators/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/providers/src/airflow/providers/postgres/operators/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/src/airflow/providers/postgres/operators/postgres.py b/providers/src/airflow/providers/postgres/operators/postgres.py deleted file mode 100644 index 7fc0a38a4e53b..0000000000000 --- a/providers/src/airflow/providers/postgres/operators/postgres.py +++ /dev/null @@ -1,88 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import warnings -from collections.abc import Mapping -from typing import ClassVar - -from deprecated import deprecated - -from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator - - -@deprecated( - reason=( - "Please use `airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`." - "Also, you can provide `hook_params={'schema': }`." - ), - category=AirflowProviderDeprecationWarning, -) -class PostgresOperator(SQLExecuteQueryOperator): - """ - Executes sql code in a specific Postgres database. - - This class is deprecated. - - Please use :class:`airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`. - - :param sql: the SQL code to be executed as a single string, or - a list of str (sql statements), or a reference to a template file. - Template references are recognized by str ending in '.sql' - :param postgres_conn_id: The :ref:`postgres conn id ` - reference to a specific postgres database. - :param autocommit: if True, each command is automatically committed. - (default value: False) - :param parameters: (optional) the parameters to render the SQL query with. - :param database: name of database which overwrite defined one in connection - :param runtime_parameters: a mapping of runtime params added to the final sql being executed. - For example, you could set the schema via `{"search_path": "CUSTOM_SCHEMA"}`. - Deprecated - use `hook_params={'options': '-c '}` instead. - """ - - template_fields_renderers: ClassVar[dict] = { - **SQLExecuteQueryOperator.template_fields_renderers, - "sql": "postgresql", - } - ui_color = "#ededed" - - def __init__( - self, - *, - postgres_conn_id: str = "postgres_default", - database: str | None = None, - runtime_parameters: Mapping | None = None, - **kwargs, - ) -> None: - if database is not None: - hook_params = kwargs.pop("hook_params", {}) - kwargs["hook_params"] = {"database": database, **hook_params} - - if runtime_parameters: - warnings.warn( - """`runtime_parameters` is deprecated. - Please use `hook_params={'options': '-c }`.""", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - hook_params = kwargs.pop("hook_params", {}) - options = " ".join(f"-c {param}={val}" for param, val in runtime_parameters.items()) - kwargs["hook_params"] = {"options": options, **hook_params} - - super().__init__(conn_id=postgres_conn_id, **kwargs) diff --git a/providers/src/airflow/providers/postgres/provider.yaml b/providers/src/airflow/providers/postgres/provider.yaml index c6b6c1df27234..7e79f85c578fd 100644 --- a/providers/src/airflow/providers/postgres/provider.yaml +++ b/providers/src/airflow/providers/postgres/provider.yaml @@ -85,11 +85,6 @@ integrations: logo: /integration-logos/postgres/Postgres.png tags: [software] -operators: - - integration-name: PostgreSQL - python-modules: - - airflow.providers.postgres.operators.postgres - hooks: - integration-name: PostgreSQL python-modules: diff --git a/providers/tests/postgres/hooks/test_postgres.py b/providers/tests/postgres/hooks/test_postgres.py index 23611a9c70fca..7a720534d4b77 100644 --- a/providers/tests/postgres/hooks/test_postgres.py +++ b/providers/tests/postgres/hooks/test_postgres.py @@ -25,7 +25,6 @@ import psycopg2.extras import pytest -from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import Connection from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.types import NOTSET @@ -340,15 +339,6 @@ def test_get_uri_from_connection_with_database_override(self): ) assert hook.get_uri() == "postgresql://login:password@host:1/database-override" - def test_schema_kwarg_database_kwarg_compatibility(self): - database = "database-override" - with pytest.warns( - AirflowProviderDeprecationWarning, - match='The "schema" arg has been renamed to "database" as it contained the database name.Please use "database" to set the database name.', - ): - hook = PostgresHook(schema=database) - assert hook.database == database - @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook") @pytest.mark.parametrize("aws_conn_id", [NOTSET, None, "mock_aws_conn"]) @pytest.mark.parametrize("port", [5432, 5439, None]) diff --git a/providers/tests/postgres/operators/__init__.py b/providers/tests/postgres/operators/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/providers/tests/postgres/operators/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/tests/postgres/operators/test_postgres.py b/providers/tests/postgres/operators/test_postgres.py deleted file mode 100644 index b7bab1392339e..0000000000000 --- a/providers/tests/postgres/operators/test_postgres.py +++ /dev/null @@ -1,231 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - -from airflow.models.dag import DAG -from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator -from airflow.providers.postgres.hooks.postgres import PostgresHook -from airflow.utils import timezone - -DEFAULT_DATE = timezone.datetime(2015, 1, 1) -DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() -DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] -TEST_DAG_ID = "unit_test_dag" -POSTGRES_DEFAULT = "postgres_default" - - -@pytest.mark.backend("postgres") -class TestPostgres: - def setup_method(self): - args = {"owner": "airflow", "start_date": DEFAULT_DATE} - dag = DAG(TEST_DAG_ID, schedule=None, default_args=args) - self.dag = dag - - def teardown_method(self): - tables_to_drop = ["test_postgres_to_postgres", "test_airflow"] - - with PostgresHook().get_conn() as conn: - with conn.cursor() as cur: - for table in tables_to_drop: - cur.execute(f"DROP TABLE IF EXISTS {table}") - - def test_postgres_operator_test(self): - sql = """ - CREATE TABLE IF NOT EXISTS test_airflow ( - dummy VARCHAR(50) - ); - """ - op = SQLExecuteQueryOperator( - task_id="basic_postgres", sql=sql, dag=self.dag, conn_id=POSTGRES_DEFAULT - ) - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - autocommit_task = SQLExecuteQueryOperator( - task_id="basic_postgres_with_autocommit", - sql=sql, - dag=self.dag, - autocommit=True, - conn_id=POSTGRES_DEFAULT, - ) - autocommit_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - def test_postgres_operator_test_multi(self): - sql = [ - "CREATE TABLE IF NOT EXISTS test_airflow (dummy VARCHAR(50))", - "TRUNCATE TABLE test_airflow", - "INSERT INTO test_airflow VALUES ('X')", - ] - op = SQLExecuteQueryOperator( - task_id="postgres_operator_test_multi", sql=sql, dag=self.dag, conn_id=POSTGRES_DEFAULT - ) - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - def test_vacuum(self): - """ - Verifies the VACUUM operation runs well with the PostgresOperator - """ - - sql = "VACUUM ANALYZE;" - op = SQLExecuteQueryOperator( - task_id="postgres_operator_test_vacuum", - sql=sql, - dag=self.dag, - autocommit=True, - conn_id=POSTGRES_DEFAULT, - ) - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - def test_overwrite_database(self): - """ - Verifies option to overwrite connection database - """ - - sql = "SELECT 1;" - op = SQLExecuteQueryOperator( - task_id="postgres_operator_test_database_overwrite", - sql=sql, - dag=self.dag, - autocommit=True, - database="foobar", - conn_id=POSTGRES_DEFAULT, - ) - - from psycopg2 import OperationalError - - with pytest.raises(OperationalError, match='database "foobar" does not exist'): - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - def test_runtime_parameter_setting(self): - """ - Verifies ability to pass server configuration parameters to - PostgresOperator - """ - - sql = "SELECT 1;" - op = SQLExecuteQueryOperator( - task_id="postgres_operator_test_runtime_parameter_setting", - sql=sql, - dag=self.dag, - hook_params={"options": "-c statement_timeout=3000ms"}, - conn_id=POSTGRES_DEFAULT, - ) - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert op.get_db_hook().get_first("SHOW statement_timeout;")[0] == "3s" - - -@pytest.mark.backend("postgres") -class TestPostgresOpenLineage: - custom_schemas = ["another_schema"] - - def setup_method(self): - args = {"owner": "airflow", "start_date": DEFAULT_DATE} - dag = DAG(TEST_DAG_ID, schedule=None, default_args=args) - self.dag = dag - - with PostgresHook().get_conn() as conn: - with conn.cursor() as cur: - for schema in self.custom_schemas: - cur.execute(f"CREATE SCHEMA {schema}") - - def teardown_method(self): - tables_to_drop = ["test_postgres_to_postgres", "test_airflow"] - - with PostgresHook().get_conn() as conn: - with conn.cursor() as cur: - for table in tables_to_drop: - cur.execute(f"DROP TABLE IF EXISTS {table}") - for schema in self.custom_schemas: - cur.execute(f"DROP SCHEMA {schema} CASCADE") - - def test_postgres_operator_openlineage_implicit_schema(self): - sql = """ - CREATE TABLE IF NOT EXISTS test_airflow ( - dummy VARCHAR(50) - ); - """ - op = SQLExecuteQueryOperator( - task_id="basic_postgres", - sql=sql, - dag=self.dag, - hook_params={"options": "-c search_path=another_schema"}, - conn_id=POSTGRES_DEFAULT, - ) - - lineage = op.get_openlineage_facets_on_start() - assert len(lineage.inputs) == 0 - assert len(lineage.outputs) == 0 - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - # OpenLineage provider runs same method on complete by default - lineage_on_complete = op.get_openlineage_facets_on_start() - assert len(lineage_on_complete.inputs) == 0 - assert len(lineage_on_complete.outputs) == 1 - assert lineage_on_complete.outputs[0].namespace == "postgres://postgres:5432" - assert lineage_on_complete.outputs[0].name == "airflow.another_schema.test_airflow" - assert "schema" in lineage_on_complete.outputs[0].facets - - def test_postgres_operator_openlineage_explicit_schema(self): - sql = """ - CREATE TABLE IF NOT EXISTS public.test_airflow ( - dummy VARCHAR(50) - ); - """ - op = SQLExecuteQueryOperator( - task_id="basic_postgres", - sql=sql, - dag=self.dag, - hook_params={"options": "-c search_path=another_schema"}, - conn_id=POSTGRES_DEFAULT, - ) - - lineage = op.get_openlineage_facets_on_start() - assert len(lineage.inputs) == 0 - assert len(lineage.outputs) == 0 - op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - # OpenLineage provider runs same method on complete by default - lineage_on_complete = op.get_openlineage_facets_on_start() - assert len(lineage_on_complete.inputs) == 0 - assert len(lineage_on_complete.outputs) == 1 - assert lineage_on_complete.outputs[0].namespace == "postgres://postgres:5432" - assert lineage_on_complete.outputs[0].name == "airflow.public.test_airflow" - assert "schema" in lineage_on_complete.outputs[0].facets - - -@pytest.mark.db_test -def test_parameters_are_templatized(create_task_instance_of_operator): - """Test that PostgreSQL operator could template the same fields as SQLExecuteQueryOperator""" - ti = create_task_instance_of_operator( - SQLExecuteQueryOperator, - conn_id="{{ param.conn_id }}", - sql="SELECT * FROM {{ param.table }} WHERE spam = %(spam)s;", - parameters={"spam": "{{ param.bar }}"}, - dag_id="test-postgres-op-parameters-are-templatized", - task_id="test-task", - ) - task: SQLExecuteQueryOperator = ti.render_templates( - { - "param": {"conn_id": "pg", "table": "foo", "bar": "egg"}, - "ti": ti, - } - ) - assert task.conn_id == "pg" - assert task.sql == "SELECT * FROM foo WHERE spam = %(spam)s;" - assert task.parameters == {"spam": "egg"}