From 93d7555d622c622091200f2e85edce0b11241a43 Mon Sep 17 00:00:00 2001 From: "m.kindritskiy" Date: Sun, 15 Sep 2024 22:54:28 +0300 Subject: [PATCH] Fix aiopg + sqlalchemy >= 1.4 compatibility issue Closes: #97 Supersedes: PR-131 --- hiku/sources/aiopg.py | 44 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/hiku/sources/aiopg.py b/hiku/sources/aiopg.py index 10acfebf..b5cd6062 100644 --- a/hiku/sources/aiopg.py +++ b/hiku/sources/aiopg.py @@ -3,9 +3,12 @@ Iterable, Any, List, + Iterator, + Tuple, ) import sqlalchemy +from sqlalchemy.sql import Select from sqlalchemy import any_ from sqlalchemy.sql.elements import BinaryExpression @@ -17,12 +20,53 @@ FETCH_SIZE = 100 +def _uniq_fields(fields: List[Field]) -> Iterator[Field]: + visited = set() + for f in fields: + if f.name not in visited: + visited.add(f.name) + yield f + + class FieldsQuery(_sa.FieldsQuery): def in_impl( self, column: sqlalchemy.Column, values: Iterable ) -> BinaryExpression: return column == any_(values) + def select_expr( + self, fields_: List[Field], ids: Iterable + ) -> Tuple[Select, Callable]: + visited = set() + columns = [] + query_columns = [] + for f in fields_: + column = self.from_clause.c[f.name] + columns.append(column) + + if f.name not in visited and column != self.primary_key: + visited.add(f.name) + query_columns.append(column) + + expr = ( + sqlalchemy.select( + *_sa._process_select_params([self.primary_key] + query_columns) + ) + .select_from(self.from_clause) + .where(self.in_impl(self.primary_key, ids)) + ) + + def result_proc(rows: List[_sa.Row]) -> List: + rows_map = { + row[self.primary_key]: [row[c] for c in columns] + for row in map(_sa._process_result_row, rows) + } + + nulls = [None for _ in fields_] + return [rows_map.get(id_, nulls) for id_ in ids] + + return expr, result_proc + async def __call__( self, ctx: Context, fields_: List[Field], ids: Iterable ) -> List: