Skip to content

Commit

Permalink
Add YaDocs connector
Browse files Browse the repository at this point in the history
  • Loading branch information
vallbull committed Nov 9, 2023
1 parent 5650969 commit 53c7dc3
Show file tree
Hide file tree
Showing 44 changed files with 1,107 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
NotificationType,
)

from dl_connector_bundle_chs3.constants import (
NOTIF_TYPE_DATA_UPDATE_FAILURE,
NOTIF_TYPE_STALE_DATA,
)


CONNECTION_TYPE_GSHEETS_V2 = ConnectionType.declare("gsheets_v2")
SOURCE_TYPE_GSHEETS_V2 = DataSourceType.declare("GSHEETS_V2")

NOTIF_TYPE_GSHEETS_V2_STALE_DATA = NotificationType.declare("stale_data")
NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE = NotificationType.declare("data_update_failure")
NOTIF_TYPE_GSHEETS_V2_STALE_DATA = NOTIF_TYPE_STALE_DATA
NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE = NOTIF_TYPE_DATA_UPDATE_FAILURE
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from marshmallow import (
fields,
validate,
)

from dl_api_connector.api_schema.extras import FieldExtra

from dl_connector_bundle_chs3.chs3_base.api.api_schema.connection import BaseFileS3ConnectionSchema
from dl_connector_bundle_chs3.chs3_yadocs.api.api_schema.source import YaDocsFileSourceSchema
from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection


class YaDocsFileS3ConnectionSchema(BaseFileS3ConnectionSchema):
TARGET_CLS = YaDocsFileS3Connection

sources = fields.Nested(
YaDocsFileSourceSchema,
many=True,
attribute="data.sources",
bi_extra=FieldExtra(editable=True),
validate=validate.Length(min=1, max=10),
)

oauth_token = fields.String(
attribute="data.oauth_token",
load_default=None,
allow_none=True,
load_only=True,
bi_extra=FieldExtra(editable=True),
)
refresh_enabled = fields.Boolean(attribute="data.refresh_enabled", bi_extra=FieldExtra(editable=True))
authorized = fields.Boolean(dump_only=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from marshmallow import fields

from dl_connector_bundle_chs3.chs3_base.api.api_schema.source import BaseFileSourceSchema
from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection


class YaDocsFileSourceSchema(BaseFileSourceSchema):
class Meta(BaseFileSourceSchema.Meta):
target = YaDocsFileS3Connection.FileDataSource

public_link = fields.String(dump_only=True)
private_path = fields.String(dump_only=True)
first_line_is_header = fields.Boolean(dump_only=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import annotations

from dl_api_connector.connection_info import ConnectionInfoProvider

from dl_connector_bundle_chs3.chs3_base.api.i18n.localizer import Translatable


class YaDocsFileS3ConnectionInfoProvider(ConnectionInfoProvider):
title_translatable = Translatable("label_connector-yadocs")
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

from dl_connector_bundle_chs3.chs3_base.api.connector import (
BaseFileS3ApiConnectionDefinition,
BaseFileS3ApiConnector,
BaseFileS3TableApiSourceDefinition,
)
from dl_connector_bundle_chs3.chs3_yadocs.api.api_schema.connection import YaDocsFileS3ConnectionSchema
from dl_connector_bundle_chs3.chs3_yadocs.api.connection_info import YaDocsFileS3ConnectionInfoProvider
from dl_connector_bundle_chs3.chs3_yadocs.core.connector import (
YaDocsFileS3CoreConnectionDefinition,
YaDocsFileS3CoreConnector,
YaDocsFileS3TableCoreSourceDefinition,
)


class YaDocsFileS3TableApiSourceDefinition(BaseFileS3TableApiSourceDefinition):
core_source_def_cls = YaDocsFileS3TableCoreSourceDefinition


class YaDocsFileS3ApiConnectionDefinition(BaseFileS3ApiConnectionDefinition):
core_conn_def_cls = YaDocsFileS3CoreConnectionDefinition
api_generic_schema_cls = YaDocsFileS3ConnectionSchema
info_provider_cls = YaDocsFileS3ConnectionInfoProvider


class YaDocsFileS3ApiConnector(BaseFileS3ApiConnector):
core_connector_cls = YaDocsFileS3CoreConnector
connection_definitions = (YaDocsFileS3ApiConnectionDefinition,)
source_definitions = (YaDocsFileS3TableApiSourceDefinition,)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logging

from dl_connector_bundle_chs3.chs3_base.core.adapter import BaseAsyncFileS3Adapter
from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS


LOGGER = logging.getLogger(__name__)


class AsyncYaDocsFileS3Adapter(BaseAsyncFileS3Adapter):
conn_type = CONNECTION_TYPE_YADOCS
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import attr

from dl_connector_bundle_chs3.chs3_base.core.connection_executors import BaseFileS3AsyncAdapterConnExecutor
from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter


@attr.s(cmp=False, hash=False)
class YaDocsFileS3AsyncAdapterConnExecutor(BaseFileS3AsyncAdapterConnExecutor):
TARGET_ADAPTER_CLS = AsyncYaDocsFileS3Adapter # type: ignore # TODO: FIX
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from dl_connector_bundle_chs3.chs3_base.core.connector import (
BaseFileS3CoreConnectionDefinition,
BaseFileS3CoreConnector,
BaseFileS3TableCoreSourceDefinition,
)
from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter
from dl_connector_bundle_chs3.chs3_yadocs.core.connection_executors import YaDocsFileS3AsyncAdapterConnExecutor
from dl_connector_bundle_chs3.chs3_yadocs.core.constants import (
CONNECTION_TYPE_YADOCS,
SOURCE_TYPE_YADOCS,
)
from dl_connector_bundle_chs3.chs3_yadocs.core.data_source import YaDocsFileS3DataSource
from dl_connector_bundle_chs3.chs3_yadocs.core.data_source_spec import YaDocsFileS3DataSourceSpec
from dl_connector_bundle_chs3.chs3_yadocs.core.lifecycle import YaDocsFileS3ConnectionLifecycleManager
from dl_connector_bundle_chs3.chs3_yadocs.core.notifications import (
DataUpdateFailureNotification,
StaleDataNotification,
)
from dl_connector_bundle_chs3.chs3_yadocs.core.settings import YaDocsFileS3SettingDefinition
from dl_connector_bundle_chs3.chs3_yadocs.core.storage_schemas.connection import YaDocsFileConnectionDataStorageSchema
from dl_connector_bundle_chs3.chs3_yadocs.core.storage_schemas.data_source_spec import (
YaDocsFileS3DataSourceSpecStorageSchema,
)
from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection
from dl_connector_clickhouse.core.clickhouse_base.adapters import ClickHouseAdapter


class YaDocsFileS3CoreConnectionDefinition(BaseFileS3CoreConnectionDefinition):
conn_type = CONNECTION_TYPE_YADOCS
connection_cls = YaDocsFileS3Connection
us_storage_schema_cls = YaDocsFileConnectionDataStorageSchema
sync_conn_executor_cls = YaDocsFileS3AsyncAdapterConnExecutor
async_conn_executor_cls = YaDocsFileS3AsyncAdapterConnExecutor
lifecycle_manager_cls = YaDocsFileS3ConnectionLifecycleManager
dialect_string = "bi_clickhouse"
settings_definition = YaDocsFileS3SettingDefinition


class YaDocsFileS3TableCoreSourceDefinition(BaseFileS3TableCoreSourceDefinition):
source_type = SOURCE_TYPE_YADOCS
source_cls = YaDocsFileS3DataSource
source_spec_cls = YaDocsFileS3DataSourceSpec
us_storage_schema_cls = YaDocsFileS3DataSourceSpecStorageSchema


class YaDocsFileS3CoreConnector(BaseFileS3CoreConnector):
connection_definitions = (YaDocsFileS3CoreConnectionDefinition,)
source_definitions = (YaDocsFileS3TableCoreSourceDefinition,)
rqe_adapter_classes = frozenset({ClickHouseAdapter, AsyncYaDocsFileS3Adapter})
notification_classes = (
StaleDataNotification,
DataUpdateFailureNotification,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dl_constants.enums import (
ConnectionType,
DataSourceType,
NotificationType,
)
from dl_connector_bundle_chs3.constants import NOTIF_TYPE_STALE_DATA, NOTIF_TYPE_DATA_UPDATE_FAILURE

CONNECTION_TYPE_YADOCS = ConnectionType.declare("yadocs")
SOURCE_TYPE_YADOCS = DataSourceType.declare("YADOCS")

NOTIF_TYPE_YADOCS_STALE_DATA = NOTIF_TYPE_STALE_DATA
NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE = NOTIF_TYPE_DATA_UPDATE_FAILURE
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import annotations

from dl_constants.enums import (
ComponentErrorLevel,
DataSourceType,
)
from dl_core import exc
from dl_core.reporting.notifications import get_notification_record

from dl_connector_bundle_chs3.chs3_base.core.data_source import BaseFileS3DataSource
from dl_connector_bundle_chs3.chs3_yadocs.core.constants import (
CONNECTION_TYPE_YADOCS,
NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE,
SOURCE_TYPE_YADOCS,
)


class YaDocsFileS3DataSource(BaseFileS3DataSource):
conn_type = CONNECTION_TYPE_YADOCS

@classmethod
def is_compatible_with_type(cls, source_type: DataSourceType) -> bool:
return source_type in {
SOURCE_TYPE_YADOCS,
}

def _handle_component_errors(self) -> None:
conn_src_id = self.origin_source_id
if conn_src_id is not None and (error_pack := self.connection.data.component_errors.get_pack(conn_src_id)):
single_error = error_pack.errors[0]

if single_error.level == ComponentErrorLevel.error:

class ThisDataSourceError(exc.DataSourceErrorFromComponentError):
err_code = exc.DataSourceErrorFromComponentError.err_code + single_error.code
default_message = single_error.message

raise ThisDataSourceError(
details=single_error.details,
)
else:
reporting_registry = self._get_connection().us_manager.get_services_registry().get_reporting_registry()
# this is the only case of connection component errors at the moment
# may be generalized in the future
reporting_registry.save_reporting_record(
get_notification_record(
NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE,
err_code=".".join(single_error.code),
request_id=single_error.details.get("request-id"),
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import attr

from dl_connector_bundle_chs3.chs3_base.core.data_source_spec import BaseFileS3DataSourceSpec


@attr.s
class YaDocsFileS3DataSourceSpec(BaseFileS3DataSourceSpec):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import datetime
from typing import ClassVar

from dl_constants.enums import FileProcessingStatus
from dl_core.connectors.base.lifecycle import ConnectionLifecycleManager
from dl_core.reporting.notifications import get_notification_record
from dl_core.utils import (
make_user_auth_cookies,
make_user_auth_headers,
)

from dl_connector_bundle_chs3.chs3_base.core.lifecycle import BaseFileS3ConnectionLifecycleManager
from dl_connector_bundle_chs3.chs3_yadocs.core.constants import NOTIF_TYPE_YADOCS_STALE_DATA
from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection


class YaDocsFileS3ConnectionLifecycleManager(
BaseFileS3ConnectionLifecycleManager,
ConnectionLifecycleManager[YaDocsFileS3Connection],
):
ENTRY_CLS = YaDocsFileS3Connection

STALE_THRESHOLD_SECONDS: ClassVar[int] = 30 * 60

async def post_exec_async_hook(self) -> None:
await super().post_exec_async_hook()

assert isinstance(self.entry, YaDocsFileS3Connection)
assert isinstance(self.entry.data, YaDocsFileS3Connection.DataModel)
data = self.entry.data

if not data.refresh_enabled:
return

dt_now = datetime.datetime.now(datetime.timezone.utc)

data_updated_at_all = data.oldest_data_update_time()
if (
data_updated_at_all is not None
and (dt_now - data_updated_at_all).total_seconds() >= self.STALE_THRESHOLD_SECONDS
):
reporting_registry = self._service_registry.get_reporting_registry()
reporting_registry.save_reporting_record(get_notification_record(NOTIF_TYPE_YADOCS_STALE_DATA))

data_updated_at = data.oldest_data_update_time(exclude_statuses={FileProcessingStatus.in_progress})
if data_updated_at is None or (dt_now - data_updated_at).total_seconds() < self.STALE_THRESHOLD_SECONDS:
return

fu_client_factory = self._service_registry.get_file_uploader_client_factory()
rci = self._us_manager.bi_context
headers = make_user_auth_headers(rci=rci)
cookies = make_user_auth_cookies(rci=rci)
fu_client = fu_client_factory.get_client(headers=headers, cookies=cookies)

sources = [src.get_desc() for src in data.sources]
assert self.entry.uuid is not None
await fu_client.update_connection_data_internal(
conn_id=self.entry.uuid,
sources=sources,
authorized=self.entry.authorized,
tenant_id=rci.tenant.get_tenant_id() if rci.tenant is not None else None,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Optional

from dl_constants.enums import NotificationLevel
from dl_core.reporting.notifications import BaseNotification

from dl_connector_bundle_chs3.chs3_yadocs.core.constants import (
NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE,
NOTIF_TYPE_YADOCS_STALE_DATA,
)


class StaleDataNotification(BaseNotification):
type = NOTIF_TYPE_YADOCS_STALE_DATA
_title = "Stale data"
_message = "The data has not been updated for more than 30 minutes, a background update is in progress"
_level = NotificationLevel.info


class DataUpdateFailureNotification(BaseNotification):
def __init__(self, err_code: str, request_id: Optional[str]) -> None:
super().__init__()
self.err_code = err_code
self.request_id = request_id or "unknown"

type = NOTIF_TYPE_YADOCS_DATA_UPDATE_FAILURE
_title = "Data update failed"
_message = (
"The displayed data may be outdated due to the failure of the last update.\n"
"Reason: {err_code}, Request-ID: {request_id}."
)
_level = NotificationLevel.warning

@property
def message(self) -> str:
return self._message.format(err_code=self.err_code, request_id=self.request_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from dl_configs.connectors_settings import (
ConnectorsConfigType,
ConnectorSettingsBase,
)
from dl_configs.settings_loaders.meta_definition import required
from dl_core.connectors.settings.primitives import (
ConnectorSettingsDefinition,
get_connectors_settings_config,
)

from dl_connector_bundle_chs3.chs3_base.core.settings import (
ConnectorsDataFileBase,
FileS3ConnectorSettings,
)


def yadocs_file_s3_settings_fallback(full_cfg: ConnectorsConfigType) -> dict[str, ConnectorSettingsBase]:
cfg = get_connectors_settings_config(
full_cfg,
object_like_config_key="FILE",
connector_data_class=ConnectorsDataFileBase,
)
if cfg is None:
return {}
return dict(
YADOCS=FileS3ConnectorSettings( # type: ignore
HOST=cfg.CONN_FILE_CH_HOST,
PORT=cfg.CONN_FILE_CH_PORT,
USERNAME=cfg.CONN_FILE_CH_USERNAME,
PASSWORD=required(str),
ACCESS_KEY_ID=required(str),
SECRET_ACCESS_KEY=required(str),
S3_ENDPOINT=full_cfg.S3_ENDPOINT_URL,
BUCKET=full_cfg.FILE_UPLOADER_S3_PERSISTENT_BUCKET_NAME,
)
)


class YaDocsFileS3SettingDefinition(ConnectorSettingsDefinition):
settings_class = FileS3ConnectorSettings
fallback = yadocs_file_s3_settings_fallback
Loading

0 comments on commit 53c7dc3

Please sign in to comment.