diff --git a/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py b/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py index 21693f7e4..d8fcc16a0 100644 --- a/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py +++ b/lib/dl_connector_mysql/dl_connector_mysql/core/async_adapters_mysql.py @@ -163,40 +163,36 @@ async def _execute_by_steps(self, db_adapter_query: DBAdapterQuery) -> AsyncIter debug_query = query if isinstance(query, str) else compile_query_for_debug(query, self._dialect) with self.handle_execution_error(debug_query): - async with self._get_connection( - db_adapter_query.db_name - ) as conn: # type: aiomysql.sa.connection.SAConnection - async with conn.begin(): - await conn.execute("SET SESSION TRANSACTION READ ONLY") - result = await conn.execute(compiled_query, compiled_query_parameters) - cursor_info = ExecutionStepCursorInfo( - cursor_info=self._make_cursor_info(result.cursor), - raw_cursor_description=list(result.cursor.description), - ) - yield cursor_info - - row_converters = self._get_row_converters(cursor_info=cursor_info) - while True: - LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn) - rows = await result.fetchmany(chunk_size) - if not rows: - LOGGER.info("No rows remaining") - break - - LOGGER.info("Rows fetched, yielding") - yield ExecutionStepDataChunk( + async with self._get_connection(db_adapter_query.db_name) as conn: + result = await conn.execute(compiled_query, compiled_query_parameters) + cursor_info = ExecutionStepCursorInfo( + cursor_info=self._make_cursor_info(result.cursor), + raw_cursor_description=list(result.cursor.description), + ) + yield cursor_info + + row_converters = self._get_row_converters(cursor_info=cursor_info) + while True: + LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn) + rows = await result.fetchmany(chunk_size) + if not rows: + LOGGER.info("No rows remaining") + break + + LOGGER.info("Rows fetched, yielding") + yield ExecutionStepDataChunk( + tuple( tuple( - tuple( - (col_converter(val) if col_converter is not None and val is not None else val) - for val, col_converter in zip( - [row[col_name] for col_name in cursor_info.cursor_info["names"]], - row_converters, - strict=True, - ) + (col_converter(val) if col_converter is not None and val is not None else val) + for val, col_converter in zip( + [row[col_name] for col_name in cursor_info.cursor_info["names"]], + row_converters, + strict=True, ) - for row in rows ) + for row in rows ) + ) @generic_profiler_async("db-full") # type: ignore # TODO: fix async def execute(self, query: DBAdapterQuery) -> AsyncRawExecutionResult: