Skip to content

Commit

Permalink
Revert "BI-5267 Readonly transaction in MySQL connector (#389)" (#460)
Browse files Browse the repository at this point in the history
This reverts commit 8ff9a8a.
  • Loading branch information
KonstantAnxiety authored May 24, 2024
1 parent 7586527 commit 5543f94
Showing 1 changed file with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,40 +163,36 @@ async def _execute_by_steps(self, db_adapter_query: DBAdapterQuery) -> AsyncIter
debug_query = query if isinstance(query, str) else compile_query_for_debug(query, self._dialect)

with self.handle_execution_error(debug_query):
async with self._get_connection(
db_adapter_query.db_name
) as conn: # type: aiomysql.sa.connection.SAConnection
async with conn.begin():
await conn.execute("SET SESSION TRANSACTION READ ONLY")
result = await conn.execute(compiled_query, compiled_query_parameters)
cursor_info = ExecutionStepCursorInfo(
cursor_info=self._make_cursor_info(result.cursor),
raw_cursor_description=list(result.cursor.description),
)
yield cursor_info

row_converters = self._get_row_converters(cursor_info=cursor_info)
while True:
LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn)
rows = await result.fetchmany(chunk_size)
if not rows:
LOGGER.info("No rows remaining")
break

LOGGER.info("Rows fetched, yielding")
yield ExecutionStepDataChunk(
async with self._get_connection(db_adapter_query.db_name) as conn:
result = await conn.execute(compiled_query, compiled_query_parameters)
cursor_info = ExecutionStepCursorInfo(
cursor_info=self._make_cursor_info(result.cursor),
raw_cursor_description=list(result.cursor.description),
)
yield cursor_info

row_converters = self._get_row_converters(cursor_info=cursor_info)
while True:
LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn)
rows = await result.fetchmany(chunk_size)
if not rows:
LOGGER.info("No rows remaining")
break

LOGGER.info("Rows fetched, yielding")
yield ExecutionStepDataChunk(
tuple(
tuple(
tuple(
(col_converter(val) if col_converter is not None and val is not None else val)
for val, col_converter in zip(
[row[col_name] for col_name in cursor_info.cursor_info["names"]],
row_converters,
strict=True,
)
(col_converter(val) if col_converter is not None and val is not None else val)
for val, col_converter in zip(
[row[col_name] for col_name in cursor_info.cursor_info["names"]],
row_converters,
strict=True,
)
for row in rows
)
for row in rows
)
)

@generic_profiler_async("db-full") # type: ignore # TODO: fix
async def execute(self, query: DBAdapterQuery) -> AsyncRawExecutionResult:
Expand Down

0 comments on commit 5543f94

Please sign in to comment.