Skip to content

Commit

Permalink
Removed legacy selector code (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod authored Nov 8, 2023
1 parent 0fa5463 commit afcf4bb
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 337 deletions.
3 changes: 0 additions & 3 deletions lib/dl_api_lib/dl_api_lib/dataset/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
CalcMode,
DataSourceRole,
ProcessorType,
SelectorType,
)
from dl_core.fields import BIField
from dl_core.us_connection_base import ClassicConnectionSQL
Expand Down Expand Up @@ -130,7 +129,6 @@ class DatasetView(DatasetBaseWrapper):

_SOURCE_DB_PROCESSOR_TYPE = ProcessorType.SOURCE_DB
_COMPENG_PROCESSOR_TYPE = ProcessorType.ASYNCPG
_SELECTOR_TYPE = SelectorType.CACHED_LAZY

_verbose_logging = True

Expand Down Expand Up @@ -180,7 +178,6 @@ def make_query_executor(self, allow_cache_usage: bool) -> QueryExecutorBase:
avatar_alias_mapper=self._avatar_alias_mapper,
compeng_processor_type=self._COMPENG_PROCESSOR_TYPE,
source_db_processor_type=self._SOURCE_DB_PROCESSOR_TYPE,
selector_type=self._SELECTOR_TYPE,
allow_cache_usage=allow_cache_usage,
us_manager=self._us_manager,
compeng_semaphore=self._compeng_semaphore,
Expand Down
5 changes: 0 additions & 5 deletions lib/dl_constants/dl_constants/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,6 @@ class RangeType(Enum):
max = "max"


class SelectorType(Enum):
CACHED = auto()
CACHED_LAZY = auto()


class ProcessorType(Enum):
SOURCE_DB = auto()
ASYNCPG = auto()
Expand Down
19 changes: 0 additions & 19 deletions lib/dl_core/dl_core/data_processing/cache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from dl_constants.enums import UserDataType
from dl_constants.types import TJSONExt
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_processing.stream_base import DataStreamBase
from dl_core.data_source.base import DataSource
from dl_core.us_dataset import Dataset
from dl_core.us_manager.local_cache import USEntryBuffer
Expand Down Expand Up @@ -157,24 +156,6 @@ def get_cache_options(
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)

def get_cache_options_for_stream(
self,
stream: DataStreamBase,
dataset: Optional[Dataset] = None,
) -> BIQueryCacheOptions:
ttl_info = self.get_cache_ttl_info(
data_source_list=stream.meta.data_source_list,
dataset=dataset,
)
key = stream.data_key
cache_enabled = key is not None
return BIQueryCacheOptions(
cache_enabled=cache_enabled,
key=key,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)


@attr.s
class SelectorCacheOptionsBuilder(DatasetOptionsBuilder):
Expand Down

This file was deleted.

106 changes: 0 additions & 106 deletions lib/dl_core/dl_core/data_processing/selectors/dataset_cached.py

This file was deleted.

This file was deleted.

32 changes: 31 additions & 1 deletion lib/dl_core/dl_core/data_processing/selectors/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dl_core.data_processing.streaming import (
AsyncChunked,
AsyncChunkedBase,
LazyAsyncChunked,
)
import dl_core.exc as exc
from dl_core.us_connection_base import ExecutorBasedMixin
Expand All @@ -28,6 +29,26 @@
@attr.s
class DatasetDbDataSelectorAsync(DatasetDataSelectorAsyncBase):
"""Async selector that fetches data from the database"""
# TODO: Merge all selector logic into data processors

_active_queries: list[BIQueryExecutionContext] = attr.ib(init=False, factory=list)

def post_exec(
self,
query_execution_ctx: BIQueryExecutionContext,
exec_exception: Optional[Exception],
) -> None:
"""Lazy behaviour: since the query has not even been executed at this point, do nothing."""

async def close(self) -> None:
"""Close all active queries"""
while self._active_queries:
await self._close_and_remove_active_query(query_execution_ctx=self._active_queries[-1])

async def _close_and_remove_active_query(self, query_execution_ctx: BIQueryExecutionContext): # type: ignore # TODO: fix
if query_execution_ctx in self._active_queries:
self._active_queries.remove(query_execution_ctx)
super().post_exec(query_execution_ctx=query_execution_ctx, exec_exception=None)

async def execute_query_context(
self,
Expand Down Expand Up @@ -55,4 +76,13 @@ async def execute_query_context(
chunk_size=None,
)
)
return AsyncChunked(chunked_data=exec_result.result)
wrapped_result_iter = AsyncChunked(chunked_data=exec_result.result)

async def initialize_data_stream() -> AsyncChunkedBase[list[TBIDataValue]]:
self._active_queries.append(query_execution_ctx)
return wrapped_result_iter # type: ignore # TODO: fix

async def finalize_data_stream() -> None:
await self._close_and_remove_active_query(query_execution_ctx=query_execution_ctx)

return LazyAsyncChunked(initializer=initialize_data_stream, finalizer=finalize_data_stream)
Loading

0 comments on commit afcf4bb

Please sign in to comment.