Skip to content

Commit

Permalink
BI-5267 Readonly transaction in MySQL connector (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
KonstantAnxiety authored Apr 12, 2024
1 parent 8f8555f commit 8ff9a8a
Showing 1 changed file with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,36 +163,40 @@ async def _execute_by_steps(self, db_adapter_query: DBAdapterQuery) -> AsyncIter
debug_query = query if isinstance(query, str) else compile_query_for_debug(query, self._dialect)

with self.handle_execution_error(debug_query):
async with self._get_connection(db_adapter_query.db_name) as conn:
result = await conn.execute(compiled_query, compiled_query_parameters)
cursor_info = ExecutionStepCursorInfo(
cursor_info=self._make_cursor_info(result.cursor),
raw_cursor_description=list(result.cursor.description),
)
yield cursor_info

row_converters = self._get_row_converters(cursor_info=cursor_info)
while True:
LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn)
rows = await result.fetchmany(chunk_size)
if not rows:
LOGGER.info("No rows remaining")
break

LOGGER.info("Rows fetched, yielding")
yield ExecutionStepDataChunk(
tuple(
async with self._get_connection(
db_adapter_query.db_name
) as conn: # type: aiomysql.sa.connection.SAConnection
async with conn.begin():
await conn.execute("SET SESSION TRANSACTION READ ONLY")
result = await conn.execute(compiled_query, compiled_query_parameters)
cursor_info = ExecutionStepCursorInfo(
cursor_info=self._make_cursor_info(result.cursor),
raw_cursor_description=list(result.cursor.description),
)
yield cursor_info

row_converters = self._get_row_converters(cursor_info=cursor_info)
while True:
LOGGER.info("Fetching %s rows (conn %s)", chunk_size, conn)
rows = await result.fetchmany(chunk_size)
if not rows:
LOGGER.info("No rows remaining")
break

LOGGER.info("Rows fetched, yielding")
yield ExecutionStepDataChunk(
tuple(
(col_converter(val) if col_converter is not None and val is not None else val)
for val, col_converter in zip(
[row[col_name] for col_name in cursor_info.cursor_info["names"]],
row_converters,
strict=True,
tuple(
(col_converter(val) if col_converter is not None and val is not None else val)
for val, col_converter in zip(
[row[col_name] for col_name in cursor_info.cursor_info["names"]],
row_converters,
strict=True,
)
)
for row in rows
)
for row in rows
)
)

@generic_profiler_async("db-full") # type: ignore # TODO: fix
async def execute(self, query: DBAdapterQuery) -> AsyncRawExecutionResult:
Expand Down

0 comments on commit 8ff9a8a

Please sign in to comment.