Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-23742] Fix using conn.setReadOnly(...) for MySQL, MSSQL #337

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/changelog/next_release/337.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Previously ``MSSQL.fetch(...)`` and ``MySQL.fetch(...)`` opened a read-write connection, despite documentation said that is should be read-only.
Now this is fixed:
* ``MSSQL.fetch(...)`` establishes connection with ``ApplicationIntent=ReadOnly``.
* ``MySQL.fetch(...)`` calls ``SET SESSION TRANSACTION READ ONLY`` statement.
14 changes: 6 additions & 8 deletions onetl/connection/db_connection/jdbc_mixin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,12 @@ def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExec
)
return jdbc_options.asConnectionProperties()

def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions):
def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
connection_properties = self._options_to_connection_properties(options)
driver_manager = self.spark._jvm.java.sql.DriverManager # type: ignore
return driver_manager.getConnection(self.jdbc_url, connection_properties)
connection = driver_manager.getConnection(self.jdbc_url, connection_properties)
connection.setReadOnly(read_only) # type: ignore
return connection

def _get_spark_dialect_name(self) -> str:
"""
Expand All @@ -389,7 +391,6 @@ def _get_spark_dialect(self):

def _get_statement_args(self) -> tuple[int, ...]:
resultset = self.spark._jvm.java.sql.ResultSet # type: ignore

return resultset.TYPE_FORWARD_ONLY, resultset.CONCUR_READ_ONLY

def _execute_on_driver(
Expand All @@ -409,13 +410,10 @@ def _execute_on_driver(
Each time new connection is opened to execute the statement, and then closed.
"""

jdbc_connection = self._get_jdbc_connection(options)
statement_args = self._get_statement_args()
jdbc_connection = self._get_jdbc_connection(options, read_only)
with closing(jdbc_connection):
jdbc_connection.setReadOnly(read_only) # type: ignore

statement_args = self._get_statement_args()
jdbc_statement = self._build_statement(statement, statement_type, jdbc_connection, statement_args)

return self._execute_statement(jdbc_statement, statement, options, callback, read_only)

def _execute_statement(
Expand Down
14 changes: 14 additions & 0 deletions onetl/connection/db_connection/mssql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from onetl._util.classproperty import classproperty
from onetl._util.version import Version
from onetl.connection.db_connection.jdbc_connection import JDBCConnection
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
)
from onetl.connection.db_connection.mssql.dialect import MSSQLDialect
from onetl.connection.db_connection.mssql.options import (
MSSQLExecuteOptions,
Expand Down Expand Up @@ -277,3 +281,13 @@ def __str__(self):

port = self.port or 1433
return f"{self.__class__.__name__}[{self.host}:{port}/{self.database}]"

def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
if read_only:
# connection.setReadOnly() is no-op in MSSQL:
# https://learn.microsoft.com/en-us/sql/connect/jdbc/reference/setreadonly-method-sqlserverconnection?view=sql-server-ver16
# Instead, we should change connection type via option:
# https://github.com/microsoft/mssql-jdbc/issues/484
options = options.copy(update={"ApplicationIntent": "ReadOnly"})

return super()._get_jdbc_connection(options, read_only)
18 changes: 18 additions & 0 deletions onetl/connection/db_connection/mysql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
from __future__ import annotations

import warnings
from contextlib import closing
from typing import ClassVar, Optional

from etl_entities.instance import Host

from onetl._util.classproperty import classproperty
from onetl._util.version import Version
from onetl.connection.db_connection.jdbc_connection import JDBCConnection
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
)
from onetl.connection.db_connection.mysql.dialect import MySQLDialect
from onetl.connection.db_connection.mysql.options import (
MySQLExecuteOptions,
Expand Down Expand Up @@ -178,3 +183,16 @@ def instance_url(self) -> str:

def __str__(self):
return f"{self.__class__.__name__}[{self.host}:{self.port}]"

def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool):
connection = super()._get_jdbc_connection(options, read_only)

# connection.setReadOnly() is no-op in MySQL JDBC driver. Session type can be changed by statement:
# https://stackoverflow.com/questions/10240890/sql-open-connection-in-read-only-mode#comment123789248_48959180
# https://dev.mysql.com/doc/refman/8.4/en/set-transaction.html
transaction = "READ ONLY" if read_only else "READ WRITE"
statement = connection.prepareStatement(f"SET SESSION TRANSACTION {transaction};")
with closing(statement):
statement.execute()

return connection
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def test_clickhouse_connection_fetch(spark, processing, load_table_data, suffix,
with pytest.raises(Exception):
clickhouse.fetch(f"SELEC 1{suffix}")

# fetch is always read-only
with pytest.raises(Exception):
clickhouse.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_clickhouse_connection_execute_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def test_greenplum_connection_fetch(spark, processing, load_table_data, suffix):
with pytest.raises(Exception):
greenplum.fetch(f"SELEC 1{suffix}")

# fetch is read-only
with pytest.raises(Exception):
greenplum.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_greenplum_connection_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def test_mssql_connection_fetch(spark, processing, load_table_data, suffix):
with pytest.raises(Exception):
mssql.fetch(f"SELEC 1{suffix}")

# fetch is always read-only
with pytest.raises(Exception):
mssql.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_mssql_connection_execute_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def test_mysql_connection_fetch(spark, processing, load_table_data, suffix):
with pytest.raises(Exception):
mysql.fetch(f"SELEC 1{suffix}")

# fetch is always read-only
with pytest.raises(Exception):
mysql.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_mysql_connection_execute_ddl(spark, processing, get_schema_table, suffix):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def test_oracle_connection_fetch(spark, processing, load_table_data, suffix):
filtered_df = table_df[table_df.ID_INT < 50]
processing.assert_equal_df(df=df, other_frame=filtered_df, order_by="id_int")

# fetch is always read-only
with pytest.raises(Exception):
oracle.fetch(f"DROP TABLE {table}{suffix}")

# not supported by JDBC, use SELECT * FROM v$tables
with pytest.raises(Exception):
oracle.fetch(f"SHOW TABLES{suffix}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ def test_postgres_connection_fetch(spark, processing, load_table_data, suffix, c
with pytest.raises(Exception):
postgres.fetch(f"SELEC 1{suffix}")

# fetch is read-only
with pytest.raises(Exception):
postgres.fetch(f"DROP TABLE {table}{suffix}")


@pytest.mark.parametrize("suffix", ["", ";"])
def test_postgres_connection_ddl(spark, processing, get_schema_table, suffix):
def test_postgres_connection_execute_ddl(spark, processing, get_schema_table, suffix):
postgres = Postgres(
host=processing.host,
port=processing.port,
Expand Down
Loading