Skip to content

Commit

Permalink
Refactored selectors and data processors for new caches
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod committed Oct 27, 2023
1 parent 2883a95 commit f3f3c25
Show file tree
Hide file tree
Showing 38 changed files with 981 additions and 668 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgService
from dl_compeng_pg.compeng_pg_base.pool_base import BasePgPoolWrapper
from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase
from dl_core.services_registry.top_level import ServicesRegistry


@attr.s
class AiopgCompEngService(CompEngPgService[AiopgPoolWrapper]):
def _get_pool_wrapper_cls(self) -> Type[BasePgPoolWrapper]:
return AiopgPoolWrapper

def get_data_processor(self) -> OperationProcessorAsyncBase:
return AiopgOperationProcessor(pg_pool=self.pool)
def get_data_processor(
self,
service_registry: ServicesRegistry,
reporting_enabled: bool,
) -> OperationProcessorAsyncBase:
return AiopgOperationProcessor(
service_registry=service_registry,
pg_pool=self.pool,
reporting_enabled=reporting_enabled,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import (
TYPE_CHECKING,
AsyncGenerator,
Awaitable,
Callable,
ClassVar,
Optional,
Sequence,
Expand All @@ -17,7 +19,9 @@

from dl_compeng_pg.compeng_pg_base.exec_adapter_base import PostgreSQLExecAdapterAsync
from dl_constants.enums import UserDataType
from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo
from dl_core.data_processing.cache.primitives import LocalKeyRepresentation
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_processing.processing.context import OpExecutionContext
from dl_core.data_processing.streaming import (
AsyncChunked,
AsyncChunkedBase,
Expand Down Expand Up @@ -49,12 +53,18 @@ async def _execute_ddl(self, query: Union[str, Executable]) -> None:
async def _execute_and_fetch(
self,
*,
query: Union[Select, str],
query: Select | str,
user_types: Sequence[UserDataType],
chunk_size: int,
joint_dsrc_info: Optional[PreparedMultiFromInfo] = None,
joint_dsrc_info: Optional[PreparedFromInfo] = None,
query_id: str,
ctx: OpExecutionContext,
data_key: LocalKeyRepresentation,
preparation_callback: Optional[Callable[[], Awaitable[None]]],
) -> AsyncChunkedBase[Sequence[TBIDataValue]]:
if preparation_callback is not None:
await preparation_callback()

async def chunked_data_gen() -> AsyncGenerator[list[list[TBIDataValue]], None]:
"""Fetch data in chunks"""

Expand Down
11 changes: 8 additions & 3 deletions lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/processor_aiopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@


@attr.s
class AiopgOperationProcessor(PostgreSQLOperationProcessor[AiopgExecAdapter, AiopgPoolWrapper, aiopg.sa.SAConnection]):
class AiopgOperationProcessor(PostgreSQLOperationProcessor[AiopgPoolWrapper, aiopg.sa.SAConnection]):
async def start(self) -> None:
self._pg_conn = await self._pg_pool.pool.acquire()
self._pgex_adapter = AiopgExecAdapter(conn=self._pg_conn) # type: ignore # TODO: fix
self._db_ex_adapter = AiopgExecAdapter(
service_registry=self.service_registry,
reporting_enabled=self._reporting_enabled,
conn=self._pg_conn,
cache_options_builder=self._cache_options_builder,
) # type: ignore # TODO: fix

async def end(self) -> None:
self._pgex_adapter = None
self._db_ex_adapter = None
if self._pg_conn is not None:
await self._pg_conn.close()
self._pg_conn = None
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgService
from dl_compeng_pg.compeng_pg_base.pool_base import BasePgPoolWrapper
from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase
from dl_core.services_registry.top_level import ServicesRegistry


@attr.s
class AsyncpgCompEngService(CompEngPgService[AsyncpgPoolWrapper]):
def _get_pool_wrapper_cls(self) -> Type[BasePgPoolWrapper]:
return AsyncpgPoolWrapper

def get_data_processor(self) -> OperationProcessorAsyncBase:
return AsyncpgOperationProcessor(pg_pool=self.pool)
def get_data_processor(
self,
service_registry: ServicesRegistry,
reporting_enabled: bool,
) -> OperationProcessorAsyncBase:
return AsyncpgOperationProcessor(
service_registry=service_registry,
pg_pool=self.pool,
reporting_enabled=reporting_enabled,
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import (
TYPE_CHECKING,
AsyncGenerator,
Awaitable,
Callable,
ClassVar,
Generator,
Optional,
Expand All @@ -20,7 +22,9 @@
from dl_compeng_pg.compeng_pg_base.exec_adapter_base import PostgreSQLExecAdapterAsync
from dl_constants.enums import UserDataType
from dl_core.connectors.base.error_transformer import DbErrorTransformer
from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo
from dl_core.data_processing.cache.primitives import LocalKeyRepresentation
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_processing.processing.context import OpExecutionContext
from dl_core.data_processing.streaming import (
AsyncChunked,
AsyncChunkedBase,
Expand Down Expand Up @@ -89,11 +93,17 @@ async def _execute_and_fetch( # type: ignore # TODO: fix
query: Union[str, sa.sql.selectable.Select],
user_types: Sequence[UserDataType],
chunk_size: int,
joint_dsrc_info: Optional[PreparedMultiFromInfo] = None,
joint_dsrc_info: Optional[PreparedFromInfo] = None,
query_id: str,
ctx: OpExecutionContext,
data_key: LocalKeyRepresentation,
preparation_callback: Optional[Callable[[], Awaitable[None]]],
) -> AsyncChunked[list[TBIDataValue]]:
query_text, params = self._compile_query(query)

if preparation_callback is not None:
await preparation_callback()

async def chunked_data_gen() -> AsyncGenerator[list[list[TBIDataValue]], None]:
"""Fetch data in chunks"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@


@attr.s
class AsyncpgOperationProcessor(
PostgreSQLOperationProcessor[AsyncpgExecAdapter, AsyncpgPoolWrapper, asyncpg.pool.PoolConnectionProxy]
):
class AsyncpgOperationProcessor(PostgreSQLOperationProcessor[AsyncpgPoolWrapper, asyncpg.pool.PoolConnectionProxy]):
_cmstack: Optional[AsyncExitStack] = attr.ib(init=False, default=None)
_timeout = 1.5

Expand All @@ -25,10 +23,17 @@ async def start(self) -> None:
pg_conn = await cmstack.enter_async_context(self._pg_pool.pool.acquire(timeout=self._timeout))
self._pg_conn = pg_conn
await cmstack.enter_async_context(pg_conn.transaction())
self._pgex_adapter = AsyncpgExecAdapter(conn=pg_conn)
self._db_ex_adapter = AsyncpgExecAdapter(
service_registry=self.service_registry,
reporting_enabled=self._reporting_enabled,
conn=pg_conn,
cache_options_builder=self._cache_options_builder,
)

async def end(self) -> None:
self._pgex_adapter = None
assert self._db_ex_adapter is not None
assert self._cmstack is not None
self._db_ex_adapter = None
await self._cmstack.aclose() # type: ignore # TODO: fix
self._pg_conn = None
self._cmstack = None
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sqlalchemy.sql.base import Executable

from dl_constants.enums import UserDataType
from dl_core.connectors.base.query_compiler import QueryCompiler
from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase
from dl_core.data_processing.streaming import AsyncChunkedBase
from dl_core.db.sa_types import make_sa_type
Expand Down Expand Up @@ -69,13 +70,12 @@ async def create_table(
table_name: str,
names: Sequence[str],
user_types: Sequence[UserDataType],
) -> sa.sql.selectable.TableClause:
) -> None:
"""Create table in database"""

table = self._make_sa_table(table_name=table_name, names=names, user_types=user_types)
self._log.info(f"Creating PG processor table {table_name}: {table}")
await self._execute_ddl(sa.schema.CreateTable(table))
return table

async def _drop_table(self, table_name: str) -> None:
await self._execute_ddl(sa.schema.DropTable(sa.table(table_name))) # type: ignore
Expand All @@ -96,3 +96,6 @@ async def insert_data_into_table(
data: AsyncChunkedBase,
) -> None:
""",,,"""

def get_query_compiler(self) -> QueryCompiler:
return QueryCompiler(dialect=self.dialect)
91 changes: 0 additions & 91 deletions lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/op_executors.py

This file was deleted.

Loading

0 comments on commit f3f3c25

Please sign in to comment.