From 73dcefa15df54e8545d80e6e11d70cdda821e54a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 6 Feb 2025 07:48:12 +0000 Subject: [PATCH] [DOP-23742] Fix using conn.setReadOnly(...) for MySQL, MSSQL --- docs/changelog/next_release/337.bugfix.rst | 4 ++++ .../db_connection/jdbc_mixin/connection.py | 14 ++++++-------- .../db_connection/mssql/connection.py | 14 ++++++++++++++ .../db_connection/mysql/connection.py | 18 ++++++++++++++++++ .../test_clickhouse_integration.py | 4 ++++ .../test_greenplum_integration.py | 4 ++++ .../test_mssql_integration.py | 4 ++++ .../test_mysql_integration.py | 4 ++++ .../test_oracle_integration.py | 4 ++++ .../test_postgres_integration.py | 6 +++++- 10 files changed, 67 insertions(+), 9 deletions(-) create mode 100644 docs/changelog/next_release/337.bugfix.rst diff --git a/docs/changelog/next_release/337.bugfix.rst b/docs/changelog/next_release/337.bugfix.rst new file mode 100644 index 000000000..6b4613437 --- /dev/null +++ b/docs/changelog/next_release/337.bugfix.rst @@ -0,0 +1,4 @@ +Previously ``MSSQL.fetch(...)`` and ``MySQL.fetch(...)`` opened a read-write connection, despite documentation said that is should be read-only. +Now this is fixed: +* ``MSSQL.fetch(...)`` establishes connection with ``ApplicationIntent=ReadOnly``. +* ``MySQL.fetch(...)`` calls ``SET SESSION TRANSACTION READ ONLY`` statement. diff --git a/onetl/connection/db_connection/jdbc_mixin/connection.py b/onetl/connection/db_connection/jdbc_mixin/connection.py index 3c75e6066..d2bea9b1b 100644 --- a/onetl/connection/db_connection/jdbc_mixin/connection.py +++ b/onetl/connection/db_connection/jdbc_mixin/connection.py @@ -371,10 +371,12 @@ def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExec ) return jdbc_options.asConnectionProperties() - def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions): + def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool): connection_properties = self._options_to_connection_properties(options) driver_manager = self.spark._jvm.java.sql.DriverManager # type: ignore - return driver_manager.getConnection(self.jdbc_url, connection_properties) + connection = driver_manager.getConnection(self.jdbc_url, connection_properties) + connection.setReadOnly(read_only) # type: ignore + return connection def _get_spark_dialect_name(self) -> str: """ @@ -389,7 +391,6 @@ def _get_spark_dialect(self): def _get_statement_args(self) -> tuple[int, ...]: resultset = self.spark._jvm.java.sql.ResultSet # type: ignore - return resultset.TYPE_FORWARD_ONLY, resultset.CONCUR_READ_ONLY def _execute_on_driver( @@ -409,13 +410,10 @@ def _execute_on_driver( Each time new connection is opened to execute the statement, and then closed. """ - jdbc_connection = self._get_jdbc_connection(options) + statement_args = self._get_statement_args() + jdbc_connection = self._get_jdbc_connection(options, read_only) with closing(jdbc_connection): - jdbc_connection.setReadOnly(read_only) # type: ignore - - statement_args = self._get_statement_args() jdbc_statement = self._build_statement(statement, statement_type, jdbc_connection, statement_args) - return self._execute_statement(jdbc_statement, statement, options, callback, read_only) def _execute_statement( diff --git a/onetl/connection/db_connection/mssql/connection.py b/onetl/connection/db_connection/mssql/connection.py index 502353322..f15b1fb7a 100644 --- a/onetl/connection/db_connection/mssql/connection.py +++ b/onetl/connection/db_connection/mssql/connection.py @@ -10,6 +10,10 @@ from onetl._util.classproperty import classproperty from onetl._util.version import Version from onetl.connection.db_connection.jdbc_connection import JDBCConnection +from onetl.connection.db_connection.jdbc_mixin.options import ( + JDBCExecuteOptions, + JDBCFetchOptions, +) from onetl.connection.db_connection.mssql.dialect import MSSQLDialect from onetl.connection.db_connection.mssql.options import ( MSSQLExecuteOptions, @@ -277,3 +281,13 @@ def __str__(self): port = self.port or 1433 return f"{self.__class__.__name__}[{self.host}:{port}/{self.database}]" + + def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool): + if read_only: + # connection.setReadOnly() is no-op in MSSQL: + # https://learn.microsoft.com/en-us/sql/connect/jdbc/reference/setreadonly-method-sqlserverconnection?view=sql-server-ver16 + # Instead, we should change connection type via option: + # https://github.com/microsoft/mssql-jdbc/issues/484 + options = options.copy(update={"ApplicationIntent": "ReadOnly"}) + + return super()._get_jdbc_connection(options, read_only) diff --git a/onetl/connection/db_connection/mysql/connection.py b/onetl/connection/db_connection/mysql/connection.py index 2588d79a3..ef239f450 100644 --- a/onetl/connection/db_connection/mysql/connection.py +++ b/onetl/connection/db_connection/mysql/connection.py @@ -3,6 +3,7 @@ from __future__ import annotations import warnings +from contextlib import closing from typing import ClassVar, Optional from etl_entities.instance import Host @@ -10,6 +11,10 @@ from onetl._util.classproperty import classproperty from onetl._util.version import Version from onetl.connection.db_connection.jdbc_connection import JDBCConnection +from onetl.connection.db_connection.jdbc_mixin.options import ( + JDBCExecuteOptions, + JDBCFetchOptions, +) from onetl.connection.db_connection.mysql.dialect import MySQLDialect from onetl.connection.db_connection.mysql.options import ( MySQLExecuteOptions, @@ -178,3 +183,16 @@ def instance_url(self) -> str: def __str__(self): return f"{self.__class__.__name__}[{self.host}:{self.port}]" + + def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool): + connection = super()._get_jdbc_connection(options, read_only) + + # connection.setReadOnly() is no-op in MySQL JDBC driver. Session type can be changed by statement: + # https://stackoverflow.com/questions/10240890/sql-open-connection-in-read-only-mode#comment123789248_48959180 + # https://dev.mysql.com/doc/refman/8.4/en/set-transaction.html + transaction = "READ ONLY" if read_only else "READ WRITE" + statement = connection.prepareStatement(f"SET SESSION TRANSACTION {transaction};") + with closing(statement): + statement.execute() + + return connection diff --git a/tests/tests_integration/tests_db_connection_integration/test_clickhouse_integration.py b/tests/tests_integration/tests_db_connection_integration/test_clickhouse_integration.py index 31d02c125..a4a963628 100644 --- a/tests/tests_integration/tests_db_connection_integration/test_clickhouse_integration.py +++ b/tests/tests_integration/tests_db_connection_integration/test_clickhouse_integration.py @@ -132,6 +132,10 @@ def test_clickhouse_connection_fetch(spark, processing, load_table_data, suffix, with pytest.raises(Exception): clickhouse.fetch(f"SELEC 1{suffix}") + # fetch is always read-only + with pytest.raises(Exception): + clickhouse.fetch(f"DROP TABLE {table}{suffix}") + @pytest.mark.parametrize("suffix", ["", ";"]) def test_clickhouse_connection_execute_ddl(spark, processing, get_schema_table, suffix): diff --git a/tests/tests_integration/tests_db_connection_integration/test_greenplum_integration.py b/tests/tests_integration/tests_db_connection_integration/test_greenplum_integration.py index 6775244ee..27ca23224 100644 --- a/tests/tests_integration/tests_db_connection_integration/test_greenplum_integration.py +++ b/tests/tests_integration/tests_db_connection_integration/test_greenplum_integration.py @@ -92,6 +92,10 @@ def test_greenplum_connection_fetch(spark, processing, load_table_data, suffix): with pytest.raises(Exception): greenplum.fetch(f"SELEC 1{suffix}") + # fetch is read-only + with pytest.raises(Exception): + greenplum.fetch(f"DROP TABLE {table}{suffix}") + @pytest.mark.parametrize("suffix", ["", ";"]) def test_greenplum_connection_ddl(spark, processing, get_schema_table, suffix): diff --git a/tests/tests_integration/tests_db_connection_integration/test_mssql_integration.py b/tests/tests_integration/tests_db_connection_integration/test_mssql_integration.py index 4fad8a754..8830d9663 100644 --- a/tests/tests_integration/tests_db_connection_integration/test_mssql_integration.py +++ b/tests/tests_integration/tests_db_connection_integration/test_mssql_integration.py @@ -131,6 +131,10 @@ def test_mssql_connection_fetch(spark, processing, load_table_data, suffix): with pytest.raises(Exception): mssql.fetch(f"SELEC 1{suffix}") + # fetch is always read-only + with pytest.raises(Exception): + mssql.fetch(f"DROP TABLE {table}{suffix}") + @pytest.mark.parametrize("suffix", ["", ";"]) def test_mssql_connection_execute_ddl(spark, processing, get_schema_table, suffix): diff --git a/tests/tests_integration/tests_db_connection_integration/test_mysql_integration.py b/tests/tests_integration/tests_db_connection_integration/test_mysql_integration.py index c75546768..9c50e50cd 100644 --- a/tests/tests_integration/tests_db_connection_integration/test_mysql_integration.py +++ b/tests/tests_integration/tests_db_connection_integration/test_mysql_integration.py @@ -130,6 +130,10 @@ def test_mysql_connection_fetch(spark, processing, load_table_data, suffix): with pytest.raises(Exception): mysql.fetch(f"SELEC 1{suffix}") + # fetch is always read-only + with pytest.raises(Exception): + mysql.fetch(f"DROP TABLE {table}{suffix}") + @pytest.mark.parametrize("suffix", ["", ";"]) def test_mysql_connection_execute_ddl(spark, processing, get_schema_table, suffix): diff --git a/tests/tests_integration/tests_db_connection_integration/test_oracle_integration.py b/tests/tests_integration/tests_db_connection_integration/test_oracle_integration.py index 3bc2c9cfe..838c6fe78 100644 --- a/tests/tests_integration/tests_db_connection_integration/test_oracle_integration.py +++ b/tests/tests_integration/tests_db_connection_integration/test_oracle_integration.py @@ -132,6 +132,10 @@ def test_oracle_connection_fetch(spark, processing, load_table_data, suffix): filtered_df = table_df[table_df.ID_INT < 50] processing.assert_equal_df(df=df, other_frame=filtered_df, order_by="id_int") + # fetch is always read-only + with pytest.raises(Exception): + oracle.fetch(f"DROP TABLE {table}{suffix}") + # not supported by JDBC, use SELECT * FROM v$tables with pytest.raises(Exception): oracle.fetch(f"SHOW TABLES{suffix}") diff --git a/tests/tests_integration/tests_db_connection_integration/test_postgres_integration.py b/tests/tests_integration/tests_db_connection_integration/test_postgres_integration.py index ead0275e2..5bbd29354 100644 --- a/tests/tests_integration/tests_db_connection_integration/test_postgres_integration.py +++ b/tests/tests_integration/tests_db_connection_integration/test_postgres_integration.py @@ -113,9 +113,13 @@ def test_postgres_connection_fetch(spark, processing, load_table_data, suffix, c with pytest.raises(Exception): postgres.fetch(f"SELEC 1{suffix}") + # fetch is read-only + with pytest.raises(Exception): + postgres.fetch(f"DROP TABLE {table}{suffix}") + @pytest.mark.parametrize("suffix", ["", ";"]) -def test_postgres_connection_ddl(spark, processing, get_schema_table, suffix): +def test_postgres_connection_execute_ddl(spark, processing, get_schema_table, suffix): postgres = Postgres( host=processing.host, port=processing.port,