diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_gsheets/core/constants.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_gsheets/core/constants.py index 4717b3641..1ed334685 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_gsheets/core/constants.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_gsheets/core/constants.py @@ -4,9 +4,14 @@ NotificationType, ) +from dl_connector_bundle_chs3.constants import ( + NOTIF_TYPE_DATA_UPDATE_FAILURE, + NOTIF_TYPE_STALE_DATA, +) + CONNECTION_TYPE_GSHEETS_V2 = ConnectionType.declare("gsheets_v2") SOURCE_TYPE_GSHEETS_V2 = DataSourceType.declare("GSHEETS_V2") -NOTIF_TYPE_GSHEETS_V2_STALE_DATA = NotificationType.declare("stale_data") -NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE = NotificationType.declare("data_update_failure") +NOTIF_TYPE_GSHEETS_V2_STALE_DATA = NOTIF_TYPE_STALE_DATA +NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE = NOTIF_TYPE_DATA_UPDATE_FAILURE diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/connection.py new file mode 100644 index 000000000..ec3e8f1a0 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/connection.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from marshmallow import ( + fields, + validate, +) + +from dl_api_connector.api_schema.extras import FieldExtra + +from dl_connector_bundle_chs3.chs3_base.api.api_schema.connection import BaseFileS3ConnectionSchema +from dl_connector_bundle_chs3.chs3_yadocs.api.api_schema.source import YaDocsFileSourceSchema +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + + +class YaDocsFileS3ConnectionSchema(BaseFileS3ConnectionSchema): + TARGET_CLS = YaDocsFileS3Connection + + sources = fields.Nested( + YaDocsFileSourceSchema, + many=True, + attribute="data.sources", + bi_extra=FieldExtra(editable=True), + validate=validate.Length(min=1, max=10), + ) + + oauth_token = fields.String( + attribute="data.oauth_token", + load_default=None, + allow_none=True, + load_only=True, + bi_extra=FieldExtra(editable=True), + ) + refresh_enabled = fields.Boolean(attribute="data.refresh_enabled", bi_extra=FieldExtra(editable=True)) + authorized = fields.Boolean(dump_only=True) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/source.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/source.py new file mode 100644 index 000000000..56de4c831 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/source.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from marshmallow import fields + +from dl_connector_bundle_chs3.chs3_base.api.api_schema.source import BaseFileSourceSchema +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + + +class YaDocsFileSourceSchema(BaseFileSourceSchema): + class Meta(BaseFileSourceSchema.Meta): + target = YaDocsFileS3Connection.FileDataSource + + public_link = fields.String(dump_only=True) + private_path = fields.String(dump_only=True) + first_line_is_header = fields.Boolean(dump_only=True) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connection_info.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connection_info.py new file mode 100644 index 000000000..572dc0f0f --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connection_info.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from dl_api_connector.connection_info import ConnectionInfoProvider + +from dl_connector_bundle_chs3.chs3_base.api.i18n.localizer import Translatable + + +class YaDocsFileS3ConnectionInfoProvider(ConnectionInfoProvider): + title_translatable = Translatable("label_connector-yadocs") diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connector.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connector.py new file mode 100644 index 000000000..8a9e583e7 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connector.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from dl_connector_bundle_chs3.chs3_base.api.connector import ( + BaseFileS3ApiConnectionDefinition, + BaseFileS3ApiConnector, + BaseFileS3TableApiSourceDefinition, +) +from dl_connector_bundle_chs3.chs3_yadocs.api.api_schema.connection import YaDocsFileS3ConnectionSchema +from dl_connector_bundle_chs3.chs3_yadocs.api.connection_info import YaDocsFileS3ConnectionInfoProvider +from dl_connector_bundle_chs3.chs3_yadocs.core.connector import ( + YaDocsFileS3CoreConnectionDefinition, + YaDocsFileS3CoreConnector, + YaDocsFileS3TableCoreSourceDefinition, +) + + +class YaDocsFileS3TableApiSourceDefinition(BaseFileS3TableApiSourceDefinition): + core_source_def_cls = YaDocsFileS3TableCoreSourceDefinition + + +class YaDocsFileS3ApiConnectionDefinition(BaseFileS3ApiConnectionDefinition): + core_conn_def_cls = YaDocsFileS3CoreConnectionDefinition + api_generic_schema_cls = YaDocsFileS3ConnectionSchema + info_provider_cls = YaDocsFileS3ConnectionInfoProvider + + +class YaDocsFileS3ApiConnector(BaseFileS3ApiConnector): + core_connector_cls = YaDocsFileS3CoreConnector + connection_definitions = (YaDocsFileS3ApiConnectionDefinition,) + source_definitions = (YaDocsFileS3TableApiSourceDefinition,) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/adapter.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/adapter.py new file mode 100644 index 000000000..dc802a124 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/adapter.py @@ -0,0 +1,11 @@ +import logging + +from dl_connector_bundle_chs3.chs3_base.core.adapter import BaseAsyncFileS3Adapter +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS + + +LOGGER = logging.getLogger(__name__) + + +class AsyncYaDocsFileS3Adapter(BaseAsyncFileS3Adapter): + conn_type = CONNECTION_TYPE_YADOCS diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connection_executors.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connection_executors.py new file mode 100644 index 000000000..acecd8500 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connection_executors.py @@ -0,0 +1,9 @@ +import attr + +from dl_connector_bundle_chs3.chs3_base.core.connection_executors import BaseFileS3AsyncAdapterConnExecutor +from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter + + +@attr.s(cmp=False, hash=False) +class YaDocsFileS3AsyncAdapterConnExecutor(BaseFileS3AsyncAdapterConnExecutor): + TARGET_ADAPTER_CLS = AsyncYaDocsFileS3Adapter # type: ignore # TODO: FIX diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connector.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connector.py new file mode 100644 index 000000000..bbb13397f --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connector.py @@ -0,0 +1,53 @@ +from dl_connector_bundle_chs3.chs3_base.core.connector import ( + BaseFileS3CoreConnectionDefinition, + BaseFileS3CoreConnector, + BaseFileS3TableCoreSourceDefinition, +) +from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter +from dl_connector_bundle_chs3.chs3_yadocs.core.connection_executors import YaDocsFileS3AsyncAdapterConnExecutor +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import ( + CONNECTION_TYPE_YADOCS, + SOURCE_TYPE_YADOCS, +) +from dl_connector_bundle_chs3.chs3_yadocs.core.data_source import YaDocsFileS3DataSource +from dl_connector_bundle_chs3.chs3_yadocs.core.data_source_spec import YaDocsFileS3DataSourceSpec +from dl_connector_bundle_chs3.chs3_yadocs.core.lifecycle import YaDocsFileS3ConnectionLifecycleManager +from dl_connector_bundle_chs3.chs3_yadocs.core.notifications import ( + DataUpdateFailureNotification, + StaleDataNotification, +) +from dl_connector_bundle_chs3.chs3_yadocs.core.settings import YaDocsFileS3SettingDefinition +from dl_connector_bundle_chs3.chs3_yadocs.core.storage_schemas.connection import YaDocsFileConnectionDataStorageSchema +from dl_connector_bundle_chs3.chs3_yadocs.core.storage_schemas.data_source_spec import ( + YaDocsFileS3DataSourceSpecStorageSchema, +) +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection +from dl_connector_clickhouse.core.clickhouse_base.adapters import ClickHouseAdapter + + +class YaDocsFileS3CoreConnectionDefinition(BaseFileS3CoreConnectionDefinition): + conn_type = CONNECTION_TYPE_YADOCS + connection_cls = YaDocsFileS3Connection + us_storage_schema_cls = YaDocsFileConnectionDataStorageSchema + sync_conn_executor_cls = YaDocsFileS3AsyncAdapterConnExecutor + async_conn_executor_cls = YaDocsFileS3AsyncAdapterConnExecutor + lifecycle_manager_cls = YaDocsFileS3ConnectionLifecycleManager + dialect_string = "bi_clickhouse" + settings_definition = YaDocsFileS3SettingDefinition + + +class YaDocsFileS3TableCoreSourceDefinition(BaseFileS3TableCoreSourceDefinition): + source_type = SOURCE_TYPE_YADOCS + source_cls = YaDocsFileS3DataSource + source_spec_cls = YaDocsFileS3DataSourceSpec + us_storage_schema_cls = YaDocsFileS3DataSourceSpecStorageSchema + + +class YaDocsFileS3CoreConnector(BaseFileS3CoreConnector): + connection_definitions = (YaDocsFileS3CoreConnectionDefinition,) + source_definitions = (YaDocsFileS3TableCoreSourceDefinition,) + rqe_adapter_classes = frozenset({ClickHouseAdapter, AsyncYaDocsFileS3Adapter}) + notification_classes = ( + StaleDataNotification, + DataUpdateFailureNotification, + ) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/constants.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/constants.py new file mode 100644 index 000000000..12e61a91c --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/constants.py @@ -0,0 +1,12 @@ +from dl_constants.enums import ( + ConnectionType, + DataSourceType, + NotificationType, +) +from dl_connector_bundle_chs3.constants import NOTIF_TYPE_STALE_DATA, NOTIF_TYPE_DATA_UPDATE_FAILURE + +CONNECTION_TYPE_YADOCS = ConnectionType.declare("yadocs") +SOURCE_TYPE_YADOCS = DataSourceType.declare("YADOCS") + +NOTIF_TYPE_YADOCS_STALE_DATA = NOTIF_TYPE_STALE_DATA +NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE = NOTIF_TYPE_DATA_UPDATE_FAILURE diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source.py new file mode 100644 index 000000000..ae88e5a40 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from dl_constants.enums import ( + ComponentErrorLevel, + DataSourceType, +) +from dl_core import exc +from dl_core.reporting.notifications import get_notification_record + +from dl_connector_bundle_chs3.chs3_base.core.data_source import BaseFileS3DataSource +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import ( + CONNECTION_TYPE_YADOCS, + NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE, + SOURCE_TYPE_YADOCS, +) + + +class YaDocsFileS3DataSource(BaseFileS3DataSource): + conn_type = CONNECTION_TYPE_YADOCS + + @classmethod + def is_compatible_with_type(cls, source_type: DataSourceType) -> bool: + return source_type in { + SOURCE_TYPE_YADOCS, + } + + def _handle_component_errors(self) -> None: + conn_src_id = self.origin_source_id + if conn_src_id is not None and (error_pack := self.connection.data.component_errors.get_pack(conn_src_id)): + single_error = error_pack.errors[0] + + if single_error.level == ComponentErrorLevel.error: + + class ThisDataSourceError(exc.DataSourceErrorFromComponentError): + err_code = exc.DataSourceErrorFromComponentError.err_code + single_error.code + default_message = single_error.message + + raise ThisDataSourceError( + details=single_error.details, + ) + else: + reporting_registry = self._get_connection().us_manager.get_services_registry().get_reporting_registry() + # this is the only case of connection component errors at the moment + # may be generalized in the future + reporting_registry.save_reporting_record( + get_notification_record( + NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE, + err_code=".".join(single_error.code), + request_id=single_error.details.get("request-id"), + ) + ) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source_spec.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source_spec.py new file mode 100644 index 000000000..10afbdd1a --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source_spec.py @@ -0,0 +1,8 @@ +import attr + +from dl_connector_bundle_chs3.chs3_base.core.data_source_spec import BaseFileS3DataSourceSpec + + +@attr.s +class YaDocsFileS3DataSourceSpec(BaseFileS3DataSourceSpec): + pass diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/lifecycle.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/lifecycle.py new file mode 100644 index 000000000..30203eb38 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/lifecycle.py @@ -0,0 +1,62 @@ +import datetime +from typing import ClassVar + +from dl_constants.enums import FileProcessingStatus +from dl_core.connectors.base.lifecycle import ConnectionLifecycleManager +from dl_core.reporting.notifications import get_notification_record +from dl_core.utils import ( + make_user_auth_cookies, + make_user_auth_headers, +) + +from dl_connector_bundle_chs3.chs3_base.core.lifecycle import BaseFileS3ConnectionLifecycleManager +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import NOTIF_TYPE_YADOCS_STALE_DATA +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + + +class YaDocsFileS3ConnectionLifecycleManager( + BaseFileS3ConnectionLifecycleManager, + ConnectionLifecycleManager[YaDocsFileS3Connection], +): + ENTRY_CLS = YaDocsFileS3Connection + + STALE_THRESHOLD_SECONDS: ClassVar[int] = 30 * 60 + + async def post_exec_async_hook(self) -> None: + await super().post_exec_async_hook() + + assert isinstance(self.entry, YaDocsFileS3Connection) + assert isinstance(self.entry.data, YaDocsFileS3Connection.DataModel) + data = self.entry.data + + if not data.refresh_enabled: + return + + dt_now = datetime.datetime.now(datetime.timezone.utc) + + data_updated_at_all = data.oldest_data_update_time() + if ( + data_updated_at_all is not None + and (dt_now - data_updated_at_all).total_seconds() >= self.STALE_THRESHOLD_SECONDS + ): + reporting_registry = self._service_registry.get_reporting_registry() + reporting_registry.save_reporting_record(get_notification_record(NOTIF_TYPE_YADOCS_STALE_DATA)) + + data_updated_at = data.oldest_data_update_time(exclude_statuses={FileProcessingStatus.in_progress}) + if data_updated_at is None or (dt_now - data_updated_at).total_seconds() < self.STALE_THRESHOLD_SECONDS: + return + + fu_client_factory = self._service_registry.get_file_uploader_client_factory() + rci = self._us_manager.bi_context + headers = make_user_auth_headers(rci=rci) + cookies = make_user_auth_cookies(rci=rci) + fu_client = fu_client_factory.get_client(headers=headers, cookies=cookies) + + sources = [src.get_desc() for src in data.sources] + assert self.entry.uuid is not None + await fu_client.update_connection_data_internal( + conn_id=self.entry.uuid, + sources=sources, + authorized=self.entry.authorized, + tenant_id=rci.tenant.get_tenant_id() if rci.tenant is not None else None, + ) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/notifications.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/notifications.py new file mode 100644 index 000000000..4482bdc1a --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/notifications.py @@ -0,0 +1,35 @@ +from typing import Optional + +from dl_constants.enums import NotificationLevel +from dl_core.reporting.notifications import BaseNotification + +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import ( + NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE, + NOTIF_TYPE_YADOCS_STALE_DATA, +) + + +class StaleDataNotification(BaseNotification): + type = NOTIF_TYPE_YADOCS_STALE_DATA + _title = "Stale data" + _message = "The data has not been updated for more than 30 minutes, a background update is in progress" + _level = NotificationLevel.info + + +class DataUpdateFailureNotification(BaseNotification): + def __init__(self, err_code: str, request_id: Optional[str]) -> None: + super().__init__() + self.err_code = err_code + self.request_id = request_id or "unknown" + + type = NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE + _title = "Data update failed" + _message = ( + "The displayed data may be outdated due to the failure of the last update.\n" + "Reason: {err_code}, Request-ID: {request_id}." + ) + _level = NotificationLevel.warning + + @property + def message(self) -> str: + return self._message.format(err_code=self.err_code, request_id=self.request_id) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/settings.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/settings.py new file mode 100644 index 000000000..58c2053a7 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/settings.py @@ -0,0 +1,41 @@ +from dl_configs.connectors_settings import ( + ConnectorsConfigType, + ConnectorSettingsBase, +) +from dl_configs.settings_loaders.meta_definition import required +from dl_core.connectors.settings.primitives import ( + ConnectorSettingsDefinition, + get_connectors_settings_config, +) + +from dl_connector_bundle_chs3.chs3_base.core.settings import ( + ConnectorsDataFileBase, + FileS3ConnectorSettings, +) + + +def yadocs_file_s3_settings_fallback(full_cfg: ConnectorsConfigType) -> dict[str, ConnectorSettingsBase]: + cfg = get_connectors_settings_config( + full_cfg, + object_like_config_key="FILE", + connector_data_class=ConnectorsDataFileBase, + ) + if cfg is None: + return {} + return dict( + YADOCS=FileS3ConnectorSettings( # type: ignore + HOST=cfg.CONN_FILE_CH_HOST, + PORT=cfg.CONN_FILE_CH_PORT, + USERNAME=cfg.CONN_FILE_CH_USERNAME, + PASSWORD=required(str), + ACCESS_KEY_ID=required(str), + SECRET_ACCESS_KEY=required(str), + S3_ENDPOINT=full_cfg.S3_ENDPOINT_URL, + BUCKET=full_cfg.FILE_UPLOADER_S3_PERSISTENT_BUCKET_NAME, + ) + ) + + +class YaDocsFileS3SettingDefinition(ConnectorSettingsDefinition): + settings_class = FileS3ConnectorSettings + fallback = yadocs_file_s3_settings_fallback diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/connection.py new file mode 100644 index 000000000..2f432dc19 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/connection.py @@ -0,0 +1,29 @@ +from marshmallow import fields + +from dl_core.us_manager.storage_schemas.connection import BaseConnectionDataStorageSchema + +from dl_connector_bundle_chs3.chs3_base.core.storage_schemas.connection import ( + BaseFileConnectionDataStorageSchema, + BaseFileConnectionSourceStorageSchema, +) +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + + +class YaDocsFileConnectionSourceStorageSchema(BaseFileConnectionSourceStorageSchema): + TARGET_CLS = YaDocsFileS3Connection.FileDataSource + + public_link = fields.String(allow_none=True, load_default=None) + private_path = fields.String(allow_none=True, load_default=None) + first_line_is_header = fields.Boolean(allow_none=True, load_default=None) + data_updated_at = fields.DateTime() + + +class YaDocsFileConnectionDataStorageSchema( + BaseFileConnectionDataStorageSchema, + BaseConnectionDataStorageSchema[YaDocsFileS3Connection.DataModel], +): + TARGET_CLS = YaDocsFileS3Connection.DataModel + + sources = fields.Nested(YaDocsFileConnectionSourceStorageSchema, many=True) + oauth_token = fields.String(allow_none=True, load_default=None) + refresh_enabled = fields.Boolean() diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/data_source_spec.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/data_source_spec.py new file mode 100644 index 000000000..0327a4504 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/storage_schemas/data_source_spec.py @@ -0,0 +1,8 @@ +from dl_connector_bundle_chs3.chs3_base.core.storage_schemas.data_source_spec import ( + BaseFileS3DataSourceSpecStorageSchema, +) +from dl_connector_bundle_chs3.chs3_yadocs.core.data_source_spec import YaDocsFileS3DataSourceSpec + + +class YaDocsFileS3DataSourceSpecStorageSchema(BaseFileS3DataSourceSpecStorageSchema): + TARGET_CLS = YaDocsFileS3DataSourceSpec diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/connection.py new file mode 100644 index 000000000..4b68027ea --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/connection.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import Any +import uuid + +from dl_core.us_manager.us_manager_sync import SyncUSManager + +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + + +def make_saved_yadocs_connection( + sync_usm: SyncUSManager, + sources: list[YaDocsFileS3Connection.FileDataSource], + **kwargs: Any, +) -> YaDocsFileS3Connection: + conn_type = CONNECTION_TYPE_YADOCS + + conn_name = "{} test conn {}".format(conn_type.name, uuid.uuid4()) + conn = YaDocsFileS3Connection.create_from_dict( + data_dict=YaDocsFileS3Connection.DataModel( + sources=sources, + ), + ds_key=conn_name, + type_=CONNECTION_TYPE_YADOCS.name, + us_manager=sync_usm, + **kwargs, + ) + sync_usm.save(conn) + return conn diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/us_connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/us_connection.py new file mode 100644 index 000000000..8122c876f --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/us_connection.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import datetime +import logging +from typing import ( + ClassVar, + Optional, +) + +import attr + +from dl_constants.enums import ( + DataSourceRole, + FileProcessingStatus, +) +from dl_core.services_registry.file_uploader_client_factory import YaDocsFileSourceDesc +from dl_utils.utils import DataKey + +from dl_connector_bundle_chs3.chs3_base.core.us_connection import BaseFileS3Connection +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import SOURCE_TYPE_YADOCS + + +LOGGER = logging.getLogger(__name__) + + +class YaDocsFileS3Connection(BaseFileS3Connection): + source_type = SOURCE_TYPE_YADOCS + allowed_source_types = frozenset((SOURCE_TYPE_YADOCS,)) + + editable_data_source_parameters: ClassVar[ + tuple[str, ...] + ] = BaseFileS3Connection.editable_data_source_parameters + ( + "public_link", + "private_path", + "first_line_is_header", + "data_updated_at", + ) + + @attr.s(eq=False, kw_only=True) + class FileDataSource(BaseFileS3Connection.FileDataSource): + public_link: Optional[str] = attr.ib(default=None) + private_path: Optional[str] = attr.ib(default=None) + first_line_is_header: Optional[bool] = attr.ib(default=None) + data_updated_at: datetime.datetime = attr.ib(factory=lambda: datetime.datetime.now(datetime.timezone.utc)) + + def __hash__(self) -> int: + raw_schema = tuple(self.raw_schema) if self.raw_schema is not None else tuple() + return hash( + ( + self.id, + self.file_id, + self.title, + self.s3_filename, + self.s3_filename_suffix, + raw_schema, + self.status, + self.public_link, + self.private_path, + ) + ) + + def str_for_hash(self) -> str: + return ",".join( + [ + super().str_for_hash(), + str(self.public_link), + str(self.private_path), + ] + ) + + def get_desc(self) -> YaDocsFileSourceDesc: + return YaDocsFileSourceDesc( + file_id=self.file_id, + source_id=self.id, + title=self.title, + raw_schema=self.raw_schema, + preview_id=self.preview_id, + public_link=self.public_link, + private_path=self.private_path, + first_line_is_header=self.first_line_is_header, + ) + + @attr.s(eq=False, kw_only=True) + class DataModel(BaseFileS3Connection.DataModel): + sources: list["YaDocsFileS3Connection.FileDataSource"] = attr.ib() # type: ignore + + oauth_token: Optional[str] = attr.ib(default=None, repr=False) + refresh_enabled: bool = attr.ib(default=False) + + def oldest_data_update_time( + self, exclude_statuses: Optional[set[FileProcessingStatus]] = None + ) -> Optional[datetime.datetime]: + if exclude_statuses is None: + exclude_statuses = set() + + data_updated_list = [src.data_updated_at for src in self.sources if src.status not in exclude_statuses] + if not data_updated_list: + return None + + return min(data_updated_list) + + @classmethod + def get_secret_keys(cls) -> set[DataKey]: + return {DataKey(parts=("oauth_token",))} + + data: DataModel + + @property + def authorized(self) -> bool: + return self.data.oauth_token is not None + + def restore_source_params_from_orig(self, src_id: str, original_version: BaseFileS3Connection) -> None: + orig_src = original_version.get_file_source_by_id(src_id) + assert isinstance(orig_src, YaDocsFileS3Connection.FileDataSource) + self.update_data_source( + src_id, + role=DataSourceRole.origin, + raw_schema=orig_src.raw_schema, + file_id=orig_src.file_id, + s3_filename=orig_src.s3_filename, + s3_filename_suffix=orig_src.s3_filename_suffix, + status=orig_src.status, + preview_id=orig_src.preview_id, + first_line_is_header=orig_src.first_line_is_header, + public_link=orig_src.public_link, + private_path=orig_src.private_path, + ) + + @property + def allow_public_usage(self) -> bool: + return True diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/constants.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/constants.py new file mode 100644 index 000000000..740a32927 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/constants.py @@ -0,0 +1,5 @@ +from dl_constants.enums import NotificationType + + +NOTIF_TYPE_STALE_DATA = NotificationType.declare("stale_data") +NOTIF_TYPE_DATA_UPDATE_FAILURE = NotificationType.declare("data_update_failure") diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po index 2031722e9..5e0de07c5 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po @@ -12,5 +12,8 @@ msgstr "" msgid "label_connector-gsheets_v2" msgstr "Google Sheets" +msgid "label_connector-gsheets_v2" +msgstr "Yandex Documents" + msgid "label_connector-file" msgstr "Files" diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po index 67f4ea408..e72a117d6 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po @@ -12,5 +12,8 @@ msgstr "" msgid "label_connector-gsheets_v2" msgstr "Google Sheets" +msgid "label_connector-yadocs" +msgstr "Яндекс Документы" + msgid "label_connector-file" msgstr "Файлы" diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py index 672ecbb7b..9a52eb713 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py @@ -11,7 +11,7 @@ host_us_pg=get_test_container_hostport("pg-us", fallback_port=52610).host, port_us_pg_5432=get_test_container_hostport("pg-us", fallback_port=52610).port, us_master_token="AC1ofiek8coB", - core_connector_ep_names=["clickhouse", "file", "gsheets_v2"], + core_connector_ep_names=["clickhouse", "file", "gsheets_v2", "yadocs"], ) SR_CONNECTION_SETTINGS = FileS3ConnectorSettings( @@ -34,7 +34,7 @@ S3_ENDPOINT_URL = f"http://{get_test_container_hostport('s3-storage', fallback_port=52620).as_pair()}" API_TEST_CONFIG = ApiTestEnvironmentConfiguration( - api_connector_ep_names=["clickhouse", "file", "gsheets_v2"], + api_connector_ep_names=["clickhouse", "file", "gsheets_v2", "yadocs"], core_test_config=CORE_TEST_CONFIG, ext_query_executer_secret_key="_some_test_secret_key_", redis_host=get_test_container_hostport("redis", fallback_port=52604).host, diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/base.py new file mode 100644 index 000000000..13d2c8886 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/base.py @@ -0,0 +1,76 @@ +import datetime +import logging + +from aiohttp import web +import pytest + +from dl_api_lib_testing.configuration import ApiTestEnvironmentConfiguration +from dl_core.us_manager.us_manager_async import AsyncUSManager + +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection +from dl_connector_bundle_chs3_tests.db.base.api.base import CHS3ConnectionApiTestBase +from dl_connector_bundle_chs3_tests.db.base.api.data import CHS3DataApiTestBase +from dl_connector_bundle_chs3_tests.db.base.api.dataset import CHS3DatasetTestBase +from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass + + +LOGGER = logging.getLogger(__name__) + + +class YaDocsFileS3ApiConnectionTestBase( + BaseYaDocsFileS3TestClass, + CHS3ConnectionApiTestBase[YaDocsFileS3Connection], +): + @pytest.fixture(scope="function") + async def mock_file_uploader_api( + self, + aiohttp_server, + bi_test_config: ApiTestEnvironmentConfiguration, + async_us_manager: AsyncUSManager, + ) -> None: + async def mocked_update_connection_data_internal(request: web.Request) -> web.Response: + req_data = await request.json() + conn_id: str = req_data["connection_id"] + sources_to_update = [src["id"] for src in req_data["sources"]] + + conn = await async_us_manager.get_by_id(conn_id, YaDocsFileS3Connection) + for src_id in sources_to_update: + src: YaDocsFileS3Connection.FileDataSource = conn.get_file_source_by_id(src_id) + src.data_updated_at = datetime.datetime.now(datetime.timezone.utc) + LOGGER.info(f"Successfully updated source id {src_id}") + await async_us_manager.save(conn) + + return web.HTTPOk() + + app = web.Application() + app.router.add_route("POST", "/api/v2/update_connection_data_internal", mocked_update_connection_data_internal) + + server = await aiohttp_server(app, port=bi_test_config.file_uploader_api_port) + + yield + + await server.close() + + @pytest.fixture(scope="function") + def connection_params( + self, + sample_file_data_source: YaDocsFileS3Connection.FileDataSource, + ) -> dict: + return dict( + refresh_enabled=True, + sources=[ + dict( + file_id=sample_file_data_source.file_id, + id=sample_file_data_source.id, + title=sample_file_data_source.title, + ), + ], + ) + + +class YaDocsFileS3DatasetTestBase(YaDocsFileS3ApiConnectionTestBase, CHS3DatasetTestBase[YaDocsFileS3Connection]): + pass + + +class YaDocsFileS3DataApiTestBase(YaDocsFileS3DatasetTestBase, CHS3DataApiTestBase[YaDocsFileS3Connection]): + pass diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_connection.py new file mode 100644 index 000000000..9abaf2529 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_connection.py @@ -0,0 +1,111 @@ +from http import HTTPStatus +import logging +import uuid + +from flask.testing import FlaskClient +import pytest + +from dl_core.us_manager.us_manager_sync import SyncUSManager +from dl_testing.utils import get_log_record + +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection +from dl_connector_bundle_chs3_tests.db.base.api.connection import CHS3ConnectionTestSuite +from dl_connector_bundle_chs3_tests.db.yadocs.api.base import YaDocsFileS3ApiConnectionTestBase + + +class TestYaDocsFileS3Connection(YaDocsFileS3ApiConnectionTestBase, CHS3ConnectionTestSuite[YaDocsFileS3Connection]): + @pytest.fixture(scope="function") + def single_new_conn_source_params(self) -> dict: + return { + "id": str(uuid.uuid4()), + "file_id": str(uuid.uuid4()), + "title": f"New File {str(uuid.uuid4())}", + } + + def test_authorization_field( + self, + client: FlaskClient, + sync_us_manager: SyncUSManager, + saved_connection_id: str, + ) -> None: + conn_id = saved_connection_id + conn = sync_us_manager.get_by_id(conn_id, YaDocsFileS3Connection) + base_update_data = { + "refresh_enabled": True, + "sources": [{"id": src.id, "title": src.title} for src in conn.data.sources], + } + + # no token => not authorized + assert conn.authorized is False + conn_resp = client.get(f"/api/v1/connections/{conn_id}") + assert conn_resp.status_code == HTTPStatus.OK, conn_resp.json + assert conn_resp.json["authorized"] is False + assert "oauth_token" not in conn_resp.json + + # add token into connection + resp = client.put( + "/api/v1/connections/{}".format(conn_id), + json={ + **base_update_data, + "oauth_token": "some_token", + }, + ) + assert resp.status_code == HTTPStatus.OK, resp.json + conn = sync_us_manager.get_by_id(conn_id, YaDocsFileS3Connection) + assert conn.authorized is True + conn_resp = client.get(f"/api/v1/connections/{conn_id}") + assert conn_resp.status_code == HTTPStatus.OK, conn_resp.json + assert conn_resp.json["authorized"] is True + assert "oauth_token" not in conn_resp.json + + # remove token from the connection + resp = client.put( + "/api/v1/connections/{}".format(conn_id), + json={ + **base_update_data, + "oauth_token": None, + }, + ) + assert resp.status_code == HTTPStatus.OK, resp.json + conn: YaDocsFileS3Connection = sync_us_manager.get_by_id(conn_id, YaDocsFileS3Connection) + assert conn.authorized is False + conn_resp = client.get(f"/api/v1/connections/{conn_id}") + assert conn_resp.status_code == HTTPStatus.OK + assert conn_resp.json["authorized"] is False + assert "oauth_token" not in conn_resp.json + + def test_force_update_with_file_id( + self, + caplog, + client: FlaskClient, + sync_us_manager: SyncUSManager, + saved_connection_id: str, + ) -> None: + """Passed file_id to an existing source means that it has been updated => this should trigger data update""" + + caplog.set_level(logging.INFO) + + conn_id = saved_connection_id + usm = sync_us_manager + conn = usm.get_by_id(conn_id, YaDocsFileS3Connection) + + resp = client.put( + "/api/v1/connections/{}".format(conn_id), + json={ + "sources": [ + { + "file_id": str(uuid.uuid4()), # force source update by passing file_id + "id": conn.data.sources[0].id, + "title": conn.data.sources[0].title, + }, + ], + }, + ) + assert resp.status_code == HTTPStatus.OK, resp.json + + schedule_save_src_log_record = get_log_record( + caplog, + predicate=lambda r: r.message.startswith("Scheduled task SaveSourceTask for source_id"), + single=True, + ) + assert conn.data.sources[0].id in schedule_save_src_log_record.message diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_data.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_data.py new file mode 100644 index 000000000..e0a059027 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_data.py @@ -0,0 +1,194 @@ +import datetime +from http import HTTPStatus + +from flask.testing import FlaskClient +import pytest + +from dl_api_client.dsmaker.api.data_api import SyncHttpDataApiV2 +from dl_api_client.dsmaker.primitives import Dataset +from dl_api_lib_testing.connector.data_api_suites import ( + DefaultConnectorDataDistinctTestSuite, + DefaultConnectorDataGroupByFormulaTestSuite, + DefaultConnectorDataPreviewTestSuite, + DefaultConnectorDataRangeTestSuite, + DefaultConnectorDataResultTestSuite, +) +from dl_api_lib_testing.data_api_base import DataApiTestParams +from dl_constants.enums import ( + ComponentErrorLevel, + ComponentType, + DataSourceRole, + FileProcessingStatus, +) +from dl_core.us_manager.us_manager_sync import SyncUSManager +from dl_testing.regulated_test import RegulatedTestParams + +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import NOTIF_TYPE_YADOCS_STALE_DATA +from dl_connector_bundle_chs3.chs3_yadocs.core.lifecycle import YaDocsFileS3ConnectionLifecycleManager +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection +from dl_connector_bundle_chs3_tests.db.base.api.data import CHS3DataResultTestSuite +from dl_connector_bundle_chs3_tests.db.yadocs.api.base import YaDocsFileS3DataApiTestBase + + +class TestYaDocsFileS3DataResult(YaDocsFileS3DataApiTestBase, CHS3DataResultTestSuite): + test_params = RegulatedTestParams( + mark_features_skipped={ + DefaultConnectorDataResultTestSuite.array_support: "YaDocs connector doesn't support arrays", + }, + ) + + @pytest.mark.asyncio + def test_update_data( + self, + sync_us_manager: SyncUSManager, + saved_connection_id: str, + data_api: SyncHttpDataApiV2, + saved_dataset: Dataset, + data_api_test_params: DataApiTestParams, + mock_file_uploader_api, + ) -> None: + ds = saved_dataset + + # prepare connection sources: set updated time to the current moment + conn = sync_us_manager.get_by_id(saved_connection_id, YaDocsFileS3Connection) + dt_now = datetime.datetime.now(datetime.timezone.utc) + data_updated_at_orig = dt_now + for src in conn.data.sources: + src.data_updated_at = dt_now + sync_us_manager.save(conn) + + def get_notifications_from_result_resp() -> list[dict]: + result_resp = data_api.get_result( + dataset=ds, fields=[ds.find_field(title=data_api_test_params.date_field)], fail_ok=True + ) + return result_resp.json.get("notifications", []) + + # it is not time to update data yet, so we expect no data updates or corresponding notifications + notifications = get_notifications_from_result_resp() + assert all( + notification["locator"] != NOTIF_TYPE_YADOCS_STALE_DATA.value for notification in notifications + ), notifications + conn = sync_us_manager.get_by_id(saved_connection_id, YaDocsFileS3Connection) + assert conn.data.sources[0].data_updated_at == data_updated_at_orig + + # trigger data update by setting the data update time in the connection to N minutes ago + data_updated_at = conn.data.oldest_data_update_time() - datetime.timedelta( + seconds=YaDocsFileS3ConnectionLifecycleManager.STALE_THRESHOLD_SECONDS + 60, # just in case + ) + for src in conn.data.sources: + src.data_updated_at = data_updated_at + sync_us_manager.save(conn) + + # now notifications should be there, as well as connection sources should be updated + notifications = get_notifications_from_result_resp() + assert any( + notification["locator"] == NOTIF_TYPE_YADOCS_STALE_DATA.value for notification in notifications + ), notifications + conn = sync_us_manager.get_by_id(saved_connection_id, YaDocsFileS3Connection) + assert conn.data.sources[0].data_updated_at != data_updated_at + + def test_component_error( + self, + sync_us_manager: SyncUSManager, + saved_connection_id: str, + saved_dataset: Dataset, + client: FlaskClient, + data_api: SyncHttpDataApiV2, + data_api_test_params: DataApiTestParams, + ) -> None: + conn = sync_us_manager.get_by_id(saved_connection_id, YaDocsFileS3Connection) + err_details = {"error": "details", "request-id": "637"} + conn.data.component_errors.add_error( + id=conn.data.sources[0].id, + type=ComponentType.data_source, + message="Custom error message", + code=["FILE", "CUSTOM_FILE_ERROR"], + details=err_details, + ) + conn.update_data_source( + conn.data.sources[0].id, + role=DataSourceRole.origin, + s3_filename=None, + s3_filename_suffix=None, + status=FileProcessingStatus.failed, + preview_id=None, + data_updated_at=datetime.datetime.now(datetime.timezone.utc), + ) + sync_us_manager.save(conn) + + ds = saved_dataset + result_resp = data_api.get_result( + dataset=ds, fields=[ds.find_field(title=data_api_test_params.distinct_field)], fail_ok=True + ) + assert result_resp.status_code == HTTPStatus.BAD_REQUEST, result_resp.json + assert result_resp.json["details"] == err_details + assert result_resp.json["message"] == "Custom error message" + assert result_resp.json["code"] == "ERR.DS_API.SOURCE.FILE.CUSTOM_FILE_ERROR" + + conn_resp = client.get(f"/api/v1/connections/{saved_connection_id}") + assert conn_resp.status_code == HTTPStatus.OK, conn_resp.json + assert conn_resp.json["component_errors"], conn_resp.json + actual_errors = conn_resp.json["component_errors"]["items"][0]["errors"] + assert len(actual_errors) == 1, actual_errors + assert actual_errors[0]["code"] == "ERR.DS_API.SOURCE.FILE.CUSTOM_FILE_ERROR" + + @pytest.mark.asyncio + def test_component_error_warning( + self, + sync_us_manager: SyncUSManager, + saved_connection_id: str, + saved_dataset: Dataset, + data_api: SyncHttpDataApiV2, + data_api_test_params: DataApiTestParams, + mock_file_uploader_api, + ) -> None: + conn = sync_us_manager.get_by_id(saved_connection_id, YaDocsFileS3Connection) + + long_long_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta( + seconds=YaDocsFileS3ConnectionLifecycleManager.STALE_THRESHOLD_SECONDS + 60, # just in case + ) + err_details = {"error": "details", "request-id": "637"} + conn.data.component_errors.add_error( + id=conn.data.sources[0].id, + type=ComponentType.data_source, + message="Custom error message", + code=["FILE", "CUSTOM_FILE_ERROR"], + details=err_details, + level=ComponentErrorLevel.warning, + ) + conn.update_data_source( + conn.data.sources[0].id, + role=DataSourceRole.origin, + data_updated_at=long_long_ago, + ) + sync_us_manager.save(conn) + + ds = saved_dataset + result_resp = data_api.get_result( + dataset=ds, fields=[ds.find_field(title=data_api_test_params.distinct_field)], fail_ok=True + ) + assert result_resp.status_code == HTTPStatus.OK, result_resp.json + assert len(result_resp.json["notifications"]) == 2 + assert "Reason: FILE.CUSTOM_FILE_ERROR, Request-ID: 637" in result_resp.json["notifications"][0]["message"] + conn = sync_us_manager.get_by_id(saved_connection_id, YaDocsFileS3Connection) + assert conn.data.sources[0].data_updated_at > long_long_ago # data update was triggered + + +class TestYaDocsFileS3DataGroupBy(YaDocsFileS3DataApiTestBase, DefaultConnectorDataGroupByFormulaTestSuite): + pass + + +class TestYaDocsFileS3DataRange(YaDocsFileS3DataApiTestBase, DefaultConnectorDataRangeTestSuite): + pass + + +class YaDocsSheetsFileDataDistinct(YaDocsFileS3DataApiTestBase, DefaultConnectorDataDistinctTestSuite): + test_params = RegulatedTestParams( + mark_tests_failed={ + DefaultConnectorDataDistinctTestSuite.test_date_filter_distinct: "FIXME", + } + ) + + +class TestYaDocsFileS3DataPreview(YaDocsFileS3DataApiTestBase, DefaultConnectorDataPreviewTestSuite): + pass diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_dataset.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_dataset.py new file mode 100644 index 000000000..b18d403df --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_dataset.py @@ -0,0 +1,6 @@ +from dl_connector_bundle_chs3_tests.db.base.api.dataset import CHS3DatasetTestSuite +from dl_connector_bundle_chs3_tests.db.yadocs.api.base import YaDocsFileS3DatasetTestBase + + +class TestYaDocsFileS3Dataset(YaDocsFileS3DatasetTestBase, CHS3DatasetTestSuite): + pass diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/base.py new file mode 100644 index 000000000..cdf62c464 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/base.py @@ -0,0 +1,48 @@ +import uuid + +import pytest + +from dl_constants.enums import FileProcessingStatus +from dl_core.us_manager.us_manager_sync import SyncUSManager +from dl_core_testing.fixtures.primitives import FixtureTableSpec + +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import ( + CONNECTION_TYPE_YADOCS, + SOURCE_TYPE_YADOCS, +) +from dl_connector_bundle_chs3.chs3_yadocs.core.testing.connection import make_saved_yadocs_connection +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection +from dl_connector_bundle_chs3_tests.db.base.core.base import BaseCHS3TestClass + + +class BaseYaDocsFileS3TestClass(BaseCHS3TestClass[YaDocsFileS3Connection]): + conn_type = CONNECTION_TYPE_YADOCS + source_type = SOURCE_TYPE_YADOCS + + @pytest.fixture(scope="function") + def sample_file_data_source( + self, + sample_table_spec: FixtureTableSpec, + sample_s3_file: str, + ) -> YaDocsFileS3Connection.FileDataSource: + raw_schema = self._get_raw_schema_for_ch_table(sample_table_spec) + return YaDocsFileS3Connection.FileDataSource( + id=str(uuid.uuid4()), + file_id=str(uuid.uuid4()), + title=sample_s3_file, + s3_filename_suffix=sample_s3_file, + raw_schema=raw_schema, + status=FileProcessingStatus.ready, + ) + + @pytest.fixture(scope="function") + def saved_connection( + self, + sync_us_manager: SyncUSManager, + connection_creation_params: dict, + ) -> YaDocsFileS3Connection: + conn = make_saved_yadocs_connection( + sync_usm=sync_us_manager, + **connection_creation_params, + ) + return conn diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_adapter.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_adapter.py new file mode 100644 index 000000000..c222b1ca4 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_adapter.py @@ -0,0 +1,12 @@ +from dl_core_testing.testcases.adapter import BaseAsyncAdapterTestClass + +from dl_connector_bundle_chs3.chs3_base.core.target_dto import BaseFileS3ConnTargetDTO +from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter +from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass + + +class TestAsyncYaDocsFileS3Adapter( + BaseYaDocsFileS3TestClass, + BaseAsyncAdapterTestClass[BaseFileS3ConnTargetDTO], +): + ASYNC_ADAPTER_CLS = AsyncYaDocsFileS3Adapter diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection.py new file mode 100644 index 000000000..b7b7b5161 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from dl_connector_bundle_chs3.file.core.us_connection import FileS3Connection +from dl_connector_bundle_chs3_tests.db.base.core.connection import CHS3ConnectionTestBase +from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass + + +class TestYaDocsFileS3Connection(BaseYaDocsFileS3TestClass, CHS3ConnectionTestBase[FileS3Connection]): + pass diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection_executor.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection_executor.py new file mode 100644 index 000000000..746ab02b5 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection_executor.py @@ -0,0 +1,20 @@ +from dl_connector_bundle_chs3.file.core.us_connection import FileS3Connection +from dl_connector_bundle_chs3_tests.db.base.core.connection_executor import ( + CHS3AsyncConnectionExecutorTestBase, + CHS3SyncConnectionExecutorTestBase, +) +from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass + + +class TestYaDocsFileS3SyncConnectionExecutor( + BaseYaDocsFileS3TestClass, + CHS3SyncConnectionExecutorTestBase[FileS3Connection], +): + pass + + +class TestYaDocsFileS3AsyncConnectionExecutor( + BaseYaDocsFileS3TestClass, + CHS3AsyncConnectionExecutorTestBase[FileS3Connection], +): + pass diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_data_source.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_data_source.py new file mode 100644 index 000000000..81939f7fe --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_data_source.py @@ -0,0 +1,12 @@ +from dl_connector_bundle_chs3.chs3_yadocs.core.data_source import YaDocsFileS3DataSource +from dl_connector_bundle_chs3.chs3_yadocs.core.data_source_spec import YaDocsFileS3DataSourceSpec +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection +from dl_connector_bundle_chs3_tests.db.base.core.data_source import CHS3TableDataSourceTestBase +from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass + + +class TestYaDocsFileS3TableDataSource( + BaseYaDocsFileS3TestClass, + CHS3TableDataSourceTestBase[YaDocsFileS3Connection, YaDocsFileS3DataSourceSpec, YaDocsFileS3DataSource], +): + DSRC_CLS = YaDocsFileS3DataSource diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_dataset.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_dataset.py new file mode 100644 index 000000000..71912c931 --- /dev/null +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_dataset.py @@ -0,0 +1,7 @@ +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection +from dl_connector_bundle_chs3_tests.db.base.core.dataset import CHS3DatasetTestBase +from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass + + +class TestYaDocsFileS3Dataset(BaseYaDocsFileS3TestClass, CHS3DatasetTestBase[YaDocsFileS3Connection]): + pass diff --git a/lib/dl_connector_bundle_chs3/pyproject.toml b/lib/dl_connector_bundle_chs3/pyproject.toml index e9e6672a8..85d82504d 100644 --- a/lib/dl_connector_bundle_chs3/pyproject.toml +++ b/lib/dl_connector_bundle_chs3/pyproject.toml @@ -34,10 +34,12 @@ datalens-task-processor = {path = "../dl_task_processor"} [tool.poetry.plugins."dl_api_lib.connectors"] file = "dl_connector_bundle_chs3.file.api.connector:FileS3ApiConnector" gsheets_v2 = "dl_connector_bundle_chs3.chs3_gsheets.api.connector:GSheetsFileS3ApiConnector" +yadocs = "dl_connector_bundle_chs3.chs3_yadocs.api.connector:YaDocsFileS3ApiConnector" [tool.poetry.plugins."dl_core.connectors"] file = "dl_connector_bundle_chs3.file.core.connector:FileS3CoreConnector" gsheets_v2 = "dl_connector_bundle_chs3.chs3_gsheets.core.connector:GSheetsFileS3CoreConnector" +yadocs = "dl_connector_bundle_chs3.chs3_yadocs.core.connector:YaDocsFileS3CoreConnector" [tool.poetry.group.tests.dependencies] pytest = ">=7.2.2" @@ -61,6 +63,10 @@ target_path = "file" root_dir = "dl_connector_bundle_chs3_tests/db" target_path = "gsheets_v2" +[datalens.pytest.db_yadocs] +root_dir = "dl_connector_bundle_chs3_tests/db" +target_path = "yadocs" + [tool.mypy] warn_unused_configs = true disallow_untyped_defs = true @@ -77,6 +83,8 @@ dl_connector_bundle_chs3 = [ {path = "dl_connector_bundle_chs3/chs3_base/core"}, {path = "dl_connector_bundle_chs3/chs3_gsheets/api"}, {path = "dl_connector_bundle_chs3/chs3_gsheets/core"}, + {path = "dl_connector_bundle_chs3/chs3_yadocs/api"}, + {path = "dl_connector_bundle_chs3/chs3_yadocs/core"}, {path = "dl_connector_bundle_chs3/file/api"}, {path = "dl_connector_bundle_chs3/file/core"}, ] diff --git a/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py b/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py index c84ebab69..41c8e0c21 100644 --- a/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py +++ b/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py @@ -49,6 +49,13 @@ class GSheetsFileSourceDesc(FileSourceDesc): first_line_is_header: Optional[bool] = attr.ib() +@attr.s(frozen=True, kw_only=True) +class YaDocsFileSourceDesc(FileSourceDesc): + public_link: Optional[str] = attr.ib() + private_path: Optional[str] = attr.ib() + first_line_is_header: Optional[bool] = attr.ib() + + @attr.s(frozen=True) class SourceInternalParams: preview_id: str = attr.ib() @@ -178,7 +185,7 @@ def cleanup_tenant_sync(self, tenant_id: str) -> None: async def update_connection_data_internal( self, conn_id: str, - sources: list[GSheetsFileSourceDesc], + sources: list[GSheetsFileSourceDesc | YaDocsFileSourceDesc], authorized: bool, tenant_id: Optional[str], ) -> None: diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py index f085d6334..4f0b081bd 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py @@ -43,6 +43,7 @@ ) from dl_connector_bundle_chs3.chs3_gsheets.core.constants import CONNECTION_TYPE_GSHEETS_V2 +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS from dl_connector_bundle_chs3.file.core.constants import CONNECTION_TYPE_FILE @@ -160,10 +161,17 @@ def _make_id(self, field: converter_parsing_utils.TResultColumn) -> str: return idx_to_alphabet_notation(field["index"]) # type: ignore # TODO: FIX +@attr.s +class YaDocsFieldIdGenerator(FileUploaderFieldIdGenerator): + def _make_id(self, field: converter_parsing_utils.TResultColumn) -> str: + return idx_to_alphabet_notation(field["index"]) # type: ignore # TODO: FIX + + def get_field_id_generator(conn_type: ConnectionType) -> FileUploaderFieldIdGenerator: field_id_gen_cls_map: dict[ConnectionType, Type[FileUploaderFieldIdGenerator]] = { CONNECTION_TYPE_FILE: FileFieldIdGenerator, CONNECTION_TYPE_GSHEETS_V2: GSheetsFieldIdGenerator, + CONNECTION_TYPE_YADOCS: YaDocsFieldIdGenerator, } if conn_type not in field_id_gen_cls_map: