From 8ff9a8a1d3585af27249050c00267213212ac05b Mon Sep 17 00:00:00 2001 From: KonstantAnxiety <58992437+KonstantAnxiety@users.noreply.github.com> Date: Fri, 12 Apr 2024 12:01:18 +0300 Subject: [PATCH] BI-5267 Readonly transaction in MySQL connector (#389) --- .../core/async_adapters_mysql.py | 56 ++++++++++--------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py b/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py index d8fcc16a0..21693f7e4 100644 --- a/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py +++ b/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py @@ -163,36 +163,40 @@ async def _execute_by_steps(self, db_adapter_query: DBAdapterQuery) -> AsyncIter debug_query = query if isinstance(query, str) else compile_query_for_debug(query, self._dialect) with self.handle_execution_error(debug_query): - async with self._get_connection(db_adapter_query.db_name) as conn: - result = await conn.execute(compiled_query, compiled_query_parameters) - cursor_info = ExecutionStepCursorInfo( - cursor_info=self._make_cursor_info(result.cursor), - raw_cursor_description=list(result.cursor.description), - ) - yield cursor_info - - row_converters = self._get_row_converters(cursor_info=cursor_info) - while True: - LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn) - rows = await result.fetchmany(chunk_size) - if not rows: - LOGGER.info("No rows remaining") - break - - LOGGER.info("Rows fetched, yielding") - yield ExecutionStepDataChunk( - tuple( + async with self._get_connection( + db_adapter_query.db_name + ) as conn: # type: aiomysql.sa.connection.SAConnection + async with conn.begin(): + await conn.execute("SET SESSION TRANSACTION READ ONLY") + result = await conn.execute(compiled_query, compiled_query_parameters) + cursor_info = ExecutionStepCursorInfo( + cursor_info=self._make_cursor_info(result.cursor), + raw_cursor_description=list(result.cursor.description), + ) + yield cursor_info + + row_converters = self._get_row_converters(cursor_info=cursor_info) + while True: + LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn) + rows = await result.fetchmany(chunk_size) + if not rows: + LOGGER.info("No rows remaining") + break + + LOGGER.info("Rows fetched, yielding") + yield ExecutionStepDataChunk( tuple( - (col_converter(val) if col_converter is not None and val is not None else val) - for val, col_converter in zip( - [row[col_name] for col_name in cursor_info.cursor_info["names"]], - row_converters, - strict=True, + tuple( + (col_converter(val) if col_converter is not None and val is not None else val) + for val, col_converter in zip( + [row[col_name] for col_name in cursor_info.cursor_info["names"]], + row_converters, + strict=True, + ) ) + for row in rows ) - for row in rows ) - ) @generic_profiler_async("db-full") # type: ignore # TODO: fix async def execute(self, query: DBAdapterQuery) -> AsyncRawExecutionResult: