Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed legacy selector code #59

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lib/dl_api_lib/dl_api_lib/dataset/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
CalcMode,
DataSourceRole,
ProcessorType,
SelectorType,
)
from dl_core.fields import BIField
from dl_core.us_connection_base import ClassicConnectionSQL
Expand Down Expand Up @@ -130,7 +129,6 @@ class DatasetView(DatasetBaseWrapper):

_SOURCE_DB_PROCESSOR_TYPE = ProcessorType.SOURCE_DB
_COMPENG_PROCESSOR_TYPE = ProcessorType.ASYNCPG
_SELECTOR_TYPE = SelectorType.CACHED_LAZY

_verbose_logging = True

Expand Down Expand Up @@ -180,7 +178,6 @@ def make_query_executor(self, allow_cache_usage: bool) -> QueryExecutorBase:
avatar_alias_mapper=self._avatar_alias_mapper,
compeng_processor_type=self._COMPENG_PROCESSOR_TYPE,
source_db_processor_type=self._SOURCE_DB_PROCESSOR_TYPE,
selector_type=self._SELECTOR_TYPE,
allow_cache_usage=allow_cache_usage,
us_manager=self._us_manager,
compeng_semaphore=self._compeng_semaphore,
Expand Down
5 changes: 0 additions & 5 deletions lib/dl_constants/dl_constants/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,6 @@ class RangeType(Enum):
max = "max"


class SelectorType(Enum):
CACHED = auto()
CACHED_LAZY = auto()


class ProcessorType(Enum):
SOURCE_DB = auto()
ASYNCPG = auto()
Expand Down
19 changes: 0 additions & 19 deletions lib/dl_core/dl_core/data_processing/cache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from dl_constants.enums import UserDataType
from dl_constants.types import TJSONExt
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_processing.stream_base import DataStreamBase
from dl_core.data_source.base import DataSource
from dl_core.us_dataset import Dataset
from dl_core.us_manager.local_cache import USEntryBuffer
Expand Down Expand Up @@ -157,24 +156,6 @@ def get_cache_options(
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)

def get_cache_options_for_stream(
self,
stream: DataStreamBase,
dataset: Optional[Dataset] = None,
) -> BIQueryCacheOptions:
ttl_info = self.get_cache_ttl_info(
data_source_list=stream.meta.data_source_list,
dataset=dataset,
)
key = stream.data_key
cache_enabled = key is not None
return BIQueryCacheOptions(
cache_enabled=cache_enabled,
key=key,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)


@attr.s
class SelectorCacheOptionsBuilder(DatasetOptionsBuilder):
Expand Down

This file was deleted.

106 changes: 0 additions & 106 deletions lib/dl_core/dl_core/data_processing/selectors/dataset_cached.py

This file was deleted.

This file was deleted.

32 changes: 31 additions & 1 deletion lib/dl_core/dl_core/data_processing/selectors/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dl_core.data_processing.streaming import (
AsyncChunked,
AsyncChunkedBase,
LazyAsyncChunked,
)
import dl_core.exc as exc
from dl_core.us_connection_base import ExecutorBasedMixin
Expand All @@ -28,6 +29,26 @@
@attr.s
class DatasetDbDataSelectorAsync(DatasetDataSelectorAsyncBase):
"""Async selector that fetches data from the database"""
# TODO: Merge all selector logic into data processors

_active_queries: list[BIQueryExecutionContext] = attr.ib(init=False, factory=list)

def post_exec(
self,
query_execution_ctx: BIQueryExecutionContext,
exec_exception: Optional[Exception],
) -> None:
"""Lazy behaviour: since the query has not even been executed at this point, do nothing."""

async def close(self) -> None:
"""Close all active queries"""
while self._active_queries:
await self._close_and_remove_active_query(query_execution_ctx=self._active_queries[-1])

async def _close_and_remove_active_query(self, query_execution_ctx: BIQueryExecutionContext): # type: ignore # TODO: fix
if query_execution_ctx in self._active_queries:
self._active_queries.remove(query_execution_ctx)
super().post_exec(query_execution_ctx=query_execution_ctx, exec_exception=None)

async def execute_query_context(
self,
Expand Down Expand Up @@ -55,4 +76,13 @@ async def execute_query_context(
chunk_size=None,
)
)
return AsyncChunked(chunked_data=exec_result.result)
wrapped_result_iter = AsyncChunked(chunked_data=exec_result.result)

async def initialize_data_stream() -> AsyncChunkedBase[list[TBIDataValue]]:
self._active_queries.append(query_execution_ctx)
return wrapped_result_iter # type: ignore # TODO: fix

async def finalize_data_stream() -> None:
await self._close_and_remove_active_query(query_execution_ctx=query_execution_ctx)

return LazyAsyncChunked(initializer=initialize_data_stream, finalizer=finalize_data_stream)
Loading
Loading