Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed ConnectionHardcodedDataMixin, moved loading of connector settings to LCM from connection #428

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import attr
import xxhash

from dl_configs.connectors_settings import ConnectorSettingsBase
from dl_constants.enums import (
DataSourceRole,
FileProcessingStatus,
Expand All @@ -28,7 +29,6 @@
from dl_core.services_registry.file_uploader_client_factory import FileSourceDesc
from dl_core.us_connection_base import (
ConnectionBase,
ConnectionHardcodedDataMixin,
DataSourceTemplate,
)
from dl_core.utils import (
Expand All @@ -50,7 +50,7 @@
LOGGER = logging.getLogger(__name__)


class BaseFileS3Connection(ConnectionHardcodedDataMixin[FileS3ConnectorSettings], ConnectionClickhouseBase):
class BaseFileS3Connection(ConnectionClickhouseBase):
is_always_internal_source: ClassVar[bool] = True
allow_cache: ClassVar[bool] = True
settings_type = FileS3ConnectorSettings
Expand Down Expand Up @@ -103,6 +103,12 @@ def str_for_hash(self) -> str:

data: DataModel

@property
def _connector_settings(self) -> FileS3ConnectorSettings:
settings = super()._connector_settings
assert isinstance(settings, FileS3ConnectorSettings)
return settings

def get_replace_secret(self) -> str:
return xxhash.xxh64(self.data.str_for_hash() + self._connector_settings.REPLACE_SECRET_SALT, seed=0).hexdigest()

Expand Down
10 changes: 7 additions & 3 deletions lib/dl_connector_chyt/dl_connector_chyt/core/us_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from dl_core.connection_executors.sync_base import SyncConnExecutorBase
from dl_core.us_connection_base import (
ConnectionBase,
ConnectionHardcodedDataMixin,
DataSourceTemplate,
SubselectMixin,
)
Expand All @@ -45,10 +44,9 @@
from dl_core.services_registry.top_level import ServicesRegistry


class BaseConnectionCHYT( # type: ignore # 2024-01-24 # TODO: Definition of "us_manager" in base class "USEntry" is incompatible with definition in base class "ConnectionHardcodedDataMixin" [misc]
class BaseConnectionCHYT(
SubselectMixin,
ConnectionBase,
ConnectionHardcodedDataMixin[CHYTConnectorSettings],
abc.ABC,
):
allow_dashsql: ClassVar[bool] = True
Expand All @@ -65,6 +63,12 @@ class DataModel(ConnCacheableDataModelMixin, ConnSubselectDataModelMixin, Connec
alias: str = attr.ib()
max_execution_time: Optional[int] = attr.ib(default=None)

@property
def _connector_settings(self) -> CHYTConnectorSettings:
settings = super()._connector_settings
assert isinstance(settings, CHYTConnectorSettings)
return settings

async def validate_new_data(
self,
services_registry: ServicesRegistry,
Expand Down
11 changes: 10 additions & 1 deletion lib/dl_core/dl_core/connectors/base/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@


class ConnectionLifecycleManager(EntryLifecycleManager[_CONNECTION_TV], Generic[_CONNECTION_TV]):
pass
def _set_connector_settings_to_connection(self) -> None:
connectors_settings = self._service_registry.get_connectors_settings(self.entry.conn_type)
if connectors_settings is not None:
# TODO: Log
assert isinstance(connectors_settings, self.entry.settings_type)
self.entry.set_connector_settings(connectors_settings)

async def post_init_async_hook(self) -> None:
await super().post_init_async_hook()
self._set_connector_settings_to_connection()


class DefaultConnectionLifecycleManager(ConnectionLifecycleManager[ConnectionBase]):
Expand Down
2 changes: 1 addition & 1 deletion lib/dl_core/dl_core/services_registry/top_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def get_data_processor_factory(self) -> BaseClosableDataProcessorFactory:
raise NotImplementedError(self.NOT_IMPLEMENTED_MSG)

def get_connectors_settings(self, conn_type: ConnectionType) -> Optional[ConnectorSettingsBase]:
raise NotImplementedError(self.NOT_IMPLEMENTED_MSG)
return None

def get_data_source_collection_factory(self, us_entry_buffer: USEntryBuffer) -> DataSourceCollectionFactory:
raise NotImplementedError(self.NOT_IMPLEMENTED_MSG)
Expand Down
41 changes: 11 additions & 30 deletions lib/dl_core/dl_core/us_connection_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Any,
Callable,
ClassVar,
Generic,
NamedTuple,
Optional,
Type,
Expand Down Expand Up @@ -71,7 +70,6 @@
from dl_core.connection_models.common_models import TableIdent
from dl_core.services_registry import ServicesRegistry
from dl_core.us_manager.us_manager import USManagerBase
from dl_core.us_manager.us_manager_sync import SyncUSManager


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -136,6 +134,7 @@ class ConnectionOptions:
class ConnectionBase(USEntry, metaclass=abc.ABCMeta):
dir_name: ClassVar[str] = "" # type: ignore # TODO: fix
scope: ClassVar[str] = "connection" # type: ignore # TODO: fix
settings_type: ClassVar[Type[ConnectorSettingsBase]] = ConnectorSettingsBase

conn_type: ConnectionType
source_type: ClassVar[Optional[DataSourceType]] = None
Expand Down Expand Up @@ -188,6 +187,8 @@ def __init__(
except ValueError:
self.conn_type = ConnectionType.unknown

self.__connector_settings: Optional[ConnectorSettingsBase] = None

@property
def allow_public_usage(self) -> bool:
return False
Expand Down Expand Up @@ -239,6 +240,14 @@ def is_dataset_allowed(self) -> bool:
def is_typed_query_allowed(self) -> bool:
return False

@property
def _connector_settings(self) -> ConnectorSettingsBase:
assert self.__connector_settings is not None
return self.__connector_settings

def set_connector_settings(self, connector_settings: ConnectorSettingsBase) -> None:
self.__connector_settings = connector_settings

def as_dict(self, short=False): # type: ignore # TODO: fix
resp = super().as_dict(short=short)
if short:
Expand Down Expand Up @@ -562,34 +571,6 @@ def parse_multihosts(self) -> tuple[str, ...]:
return parse_comma_separated_hosts(self.data.host)


CONNECTOR_SETTINGS_TV = TypeVar("CONNECTOR_SETTINGS_TV", bound=ConnectorSettingsBase)


class ConnectionHardcodedDataMixin(Generic[CONNECTOR_SETTINGS_TV], metaclass=abc.ABCMeta):
"""Connector type specific data is loaded from dl_configs.connectors_settings"""

conn_type: ConnectionType
settings_type: Type[CONNECTOR_SETTINGS_TV]
us_manager: SyncUSManager

# TODO: remove
@classmethod
def _get_connector_settings(cls, usm: SyncUSManager) -> CONNECTOR_SETTINGS_TV:
sr = usm.get_services_registry()
connectors_settings = sr.get_connectors_settings(cls.conn_type)
assert connectors_settings is not None
assert isinstance(connectors_settings, cls.settings_type)
return connectors_settings

@property
def _connector_settings(self) -> CONNECTOR_SETTINGS_TV:
sr = self.us_manager.get_services_registry()
connectors_settings = sr.get_connectors_settings(self.conn_type)
assert connectors_settings is not None
assert isinstance(connectors_settings, self.settings_type)
return connectors_settings


class HiddenDatabaseNameMixin(ConnectionSQL):
"""Mixin that hides db name in UI"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ def _prepare_cache_data(self, entry: USEntry, mutation_key: MutationKey) -> str:
serialized_entry["unversionedData"] = serialized_entry.pop("unversioned_data")
return self._dump_raw_cache_data(mutation_key.get_collision_tier_breaker(), serialized_entry)

def _restore_entry_from_cache_representation(self, data: str) -> USEntry:
async def _restore_entry_from_cache_representation(self, data: str) -> USEntry:
entry_dict = json.loads(data)
return self._usm._entry_dict_to_obj(entry_dict)
obj = self._usm._entry_dict_to_obj(entry_dict)
await self._usm.get_lifecycle_manager(entry=obj).post_init_async_hook()
return obj

async def save_mutation_cache(self, entry: USEntry, mutation_key: MutationKey) -> None:
key = USEntryMutationCacheKey(
Expand Down Expand Up @@ -156,4 +158,4 @@ async def get_mutated_entry_from_cache(
if collision_meta != mutation_key.get_collision_tier_breaker():
return None

return self._restore_entry_from_cache_representation(us_entry_data)
return await self._restore_entry_from_cache_representation(us_entry_data)
1 change: 1 addition & 0 deletions lib/dl_core/dl_core/us_manager/us_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def clone_entry_instance(self, entry: USEntry) -> USEntry:
unversionedData=entry_data.pop("unversioned_data"),
**entry_loc.to_us_resp_api_params(entry.raw_us_key),
)
# FIXME: Note that post_init_async_hook is not called here. Should it be?
return self._entry_dict_to_obj(entry_data)

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions lib/dl_core/dl_core/us_manager/us_manager_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ async def save(self, entry: USEntry) -> None:
resp = await self._us_client.update_entry(entry.uuid, lock=entry._lock, **save_params)

entry._us_resp = resp
await self.get_lifecycle_manager(entry=entry).post_init_async_hook()

async def delete(self, entry: USEntry) -> None:
# TODO FIX: Use pre_delete_async_hook!!!
Expand All @@ -182,6 +183,7 @@ async def reload_data(self, entry: USEntry) -> None:
reloaded_entry = self._entry_dict_to_obj(us_resp, expected_type=type(entry))
entry.data = reloaded_entry.data
entry._us_resp = us_resp
await self.get_lifecycle_manager(entry=entry).post_init_async_hook()

@asynccontextmanager # type: ignore # TODO: fix
async def locked_entry_cm(
Expand Down Expand Up @@ -333,6 +335,7 @@ async def get_collection(
# noinspection PyBroadException
try:
obj: USEntry = self._entry_dict_to_obj(us_resp, entry_cls)
await self.get_lifecycle_manager(entry=obj).post_init_async_hook()
yield obj
except Exception:
LOGGER.exception("Failed to load US object: %s", us_resp)
Expand Down
7 changes: 5 additions & 2 deletions lib/dl_core/dl_core/us_manager/us_manager_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
Dict,
Generator,
Iterable,
List,
Optional,
Set,
Type,
Expand Down Expand Up @@ -147,6 +146,7 @@ def save(self, entry: USEntry) -> None:
)

entry._us_resp = resp # type: ignore # TODO: fix
await_sync(self.get_lifecycle_manager(entry=entry).post_init_async_hook())
lifecycle_manager.post_save_hook()

def delete(self, entry: USEntry) -> None:
Expand Down Expand Up @@ -229,7 +229,9 @@ def get_collection(
for us_resp in us_entry_iterator:
# noinspection PyBroadException
try:
yield self._entry_dict_to_obj(us_resp, expected_type=entry_cls) # type: ignore # TODO: fix
obj = self._entry_dict_to_obj(us_resp, expected_type=entry_cls)
await_sync(self.get_lifecycle_manager(entry=obj).post_init_async_hook())
yield obj # type: ignore # TODO: Incompatible types in "yield" (actual type "USEntry", expected type "_ENTRY_TV") [misc]
except Exception:
LOGGER.exception("Failed to load US object: %s", us_resp)
if raise_on_broken_entry:
Expand Down Expand Up @@ -261,6 +263,7 @@ def reload_data(self, entry: USEntry) -> None:
reloaded_entry = self._entry_dict_to_obj(us_resp, expected_type=type(entry))
entry.data = reloaded_entry.data
entry._us_resp = us_resp
await_sync(self.get_lifecycle_manager(entry=entry).post_init_async_hook())

# Locks
#
Expand Down
Loading