Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add YaDocs connector #80

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from dl_constants.enums import NotificationType

from dl_connector_clickhouse.core.clickhouse_base.constants import BACKEND_TYPE_CLICKHOUSE


BACKEND_TYPE_CHS3 = BACKEND_TYPE_CLICKHOUSE

NOTIF_TYPE_STALE_DATA = NotificationType.declare("stale_data")
NOTIF_TYPE_DATA_UPDATE_FAILURE = NotificationType.declare("data_update_failure")
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from dl_constants.enums import NotificationLevel
from dl_core.reporting.notifications import BaseNotification

from dl_connector_bundle_chs3.chs3_gsheets.core.constants import (
NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE,
NOTIF_TYPE_GSHEETS_V2_STALE_DATA,
from dl_connector_bundle_chs3.chs3_base.core.constants import (
NOTIF_TYPE_DATA_UPDATE_FAILURE,
NOTIF_TYPE_STALE_DATA,
)


class StaleDataNotification(BaseNotification):
type = NOTIF_TYPE_GSHEETS_V2_STALE_DATA
type = NOTIF_TYPE_STALE_DATA
_title = "Stale data"
_message = "The data has not been updated for more than 30 minutes, a background update is in progress"
_level = NotificationLevel.info
Expand All @@ -22,7 +22,7 @@ def __init__(self, err_code: str, request_id: Optional[str]) -> None:
self.err_code = err_code
self.request_id = request_id or "unknown"

type = NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE
type = NOTIF_TYPE_DATA_UPDATE_FAILURE
_title = "Data update failed"
_message = (
"The displayed data may be outdated due to the failure of the last update.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
BaseFileS3CoreConnector,
BaseFileS3TableCoreSourceDefinition,
)
from dl_connector_bundle_chs3.chs3_base.core.notifications import (
DataUpdateFailureNotification,
StaleDataNotification,
)
from dl_connector_bundle_chs3.chs3_gsheets.core.adapter import AsyncGSheetsFileS3Adapter
from dl_connector_bundle_chs3.chs3_gsheets.core.connection_executors import GSheetsFileS3AsyncAdapterConnExecutor
from dl_connector_bundle_chs3.chs3_gsheets.core.constants import (
Expand All @@ -12,10 +16,6 @@
from dl_connector_bundle_chs3.chs3_gsheets.core.data_source import GSheetsFileS3DataSource
from dl_connector_bundle_chs3.chs3_gsheets.core.data_source_spec import GSheetsFileS3DataSourceSpec
from dl_connector_bundle_chs3.chs3_gsheets.core.lifecycle import GSheetsFileS3ConnectionLifecycleManager
from dl_connector_bundle_chs3.chs3_gsheets.core.notifications import (
DataUpdateFailureNotification,
StaleDataNotification,
)
from dl_connector_bundle_chs3.chs3_gsheets.core.settings import GSheetsFileS3SettingDefinition
from dl_connector_bundle_chs3.chs3_gsheets.core.storage_schemas.connection import GSheetsFileConnectionDataStorageSchema
from dl_connector_bundle_chs3.chs3_gsheets.core.storage_schemas.data_source_spec import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from dl_constants.enums import (
ConnectionType,
DataSourceType,
NotificationType,
)


CONNECTION_TYPE_GSHEETS_V2 = ConnectionType.declare("gsheets_v2")
SOURCE_TYPE_GSHEETS_V2 = DataSourceType.declare("GSHEETS_V2")

NOTIF_TYPE_GSHEETS_V2_STALE_DATA = NotificationType.declare("stale_data")
NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE = NotificationType.declare("data_update_failure")
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from dl_core import exc
from dl_core.reporting.notifications import get_notification_record

from dl_connector_bundle_chs3.chs3_base.core.constants import NOTIF_TYPE_DATA_UPDATE_FAILURE
from dl_connector_bundle_chs3.chs3_base.core.data_source import BaseFileS3DataSource
from dl_connector_bundle_chs3.chs3_gsheets.core.constants import (
CONNECTION_TYPE_GSHEETS_V2,
NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE,
SOURCE_TYPE_GSHEETS_V2,
)

Expand Down Expand Up @@ -44,7 +44,7 @@ class ThisDataSourceError(exc.DataSourceErrorFromComponentError):
# may be generalized in the future
reporting_registry.save_reporting_record(
get_notification_record(
NOTIF_TYPE_GSHEETS_V2_DATA_UPDATE_FAILURE,
NOTIF_TYPE_DATA_UPDATE_FAILURE,
err_code=".".join(single_error.code),
request_id=single_error.details.get("request-id"),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
make_user_auth_headers,
)

from dl_connector_bundle_chs3.chs3_base.core.constants import NOTIF_TYPE_STALE_DATA
from dl_connector_bundle_chs3.chs3_base.core.lifecycle import BaseFileS3ConnectionLifecycleManager
from dl_connector_bundle_chs3.chs3_gsheets.core.constants import NOTIF_TYPE_GSHEETS_V2_STALE_DATA
from dl_connector_bundle_chs3.chs3_gsheets.core.us_connection import GSheetsFileS3Connection


Expand Down Expand Up @@ -40,7 +40,7 @@ async def post_exec_async_hook(self) -> None:
and (dt_now - data_updated_at_all).total_seconds() >= self.STALE_THRESHOLD_SECONDS
):
reporting_registry = self._service_registry.get_reporting_registry()
reporting_registry.save_reporting_record(get_notification_record(NOTIF_TYPE_GSHEETS_V2_STALE_DATA))
reporting_registry.save_reporting_record(get_notification_record(NOTIF_TYPE_STALE_DATA))

data_updated_at = data.oldest_data_update_time(exclude_statuses={FileProcessingStatus.in_progress})
if data_updated_at is None or (dt_now - data_updated_at).total_seconds() < self.STALE_THRESHOLD_SECONDS:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from marshmallow import (
fields,
validate,
)

from dl_api_connector.api_schema.extras import FieldExtra

from dl_connector_bundle_chs3.chs3_base.api.api_schema.connection import BaseFileS3ConnectionSchema
from dl_connector_bundle_chs3.chs3_yadocs.api.api_schema.source import YaDocsFileSourceSchema
from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection


class YaDocsFileS3ConnectionSchema(BaseFileS3ConnectionSchema):
TARGET_CLS = YaDocsFileS3Connection

sources = fields.Nested(
YaDocsFileSourceSchema,
many=True,
attribute="data.sources",
bi_extra=FieldExtra(editable=True),
validate=validate.Length(min=1, max=10),
)

oauth_token = fields.String(
attribute="data.oauth_token",
load_default=None,
allow_none=True,
load_only=True,
bi_extra=FieldExtra(editable=True),
)
refresh_enabled = fields.Boolean(attribute="data.refresh_enabled", bi_extra=FieldExtra(editable=True))
authorized = fields.Boolean(dump_only=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from marshmallow import fields

from dl_connector_bundle_chs3.chs3_base.api.api_schema.source import BaseFileSourceSchema
from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection


class YaDocsFileSourceSchema(BaseFileSourceSchema):
class Meta(BaseFileSourceSchema.Meta):
target = YaDocsFileS3Connection.FileDataSource

public_link = fields.String(dump_only=True)
private_path = fields.String(dump_only=True)
first_line_is_header = fields.Boolean(dump_only=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import annotations

from dl_api_connector.connection_info import ConnectionInfoProvider

from dl_connector_bundle_chs3.chs3_base.api.i18n.localizer import Translatable


class YaDocsFileS3ConnectionInfoProvider(ConnectionInfoProvider):
title_translatable = Translatable("label_connector-yadocs")
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

from dl_connector_bundle_chs3.chs3_base.api.connector import (
BaseFileS3ApiConnectionDefinition,
BaseFileS3ApiConnector,
BaseFileS3TableApiSourceDefinition,
)
from dl_connector_bundle_chs3.chs3_yadocs.api.api_schema.connection import YaDocsFileS3ConnectionSchema
from dl_connector_bundle_chs3.chs3_yadocs.api.connection_info import YaDocsFileS3ConnectionInfoProvider
from dl_connector_bundle_chs3.chs3_yadocs.core.connector import (
YaDocsFileS3CoreConnectionDefinition,
YaDocsFileS3CoreConnector,
YaDocsFileS3TableCoreSourceDefinition,
)


class YaDocsFileS3TableApiSourceDefinition(BaseFileS3TableApiSourceDefinition):
core_source_def_cls = YaDocsFileS3TableCoreSourceDefinition


class YaDocsFileS3ApiConnectionDefinition(BaseFileS3ApiConnectionDefinition):
core_conn_def_cls = YaDocsFileS3CoreConnectionDefinition
api_generic_schema_cls = YaDocsFileS3ConnectionSchema
info_provider_cls = YaDocsFileS3ConnectionInfoProvider


class YaDocsFileS3ApiConnector(BaseFileS3ApiConnector):
core_connector_cls = YaDocsFileS3CoreConnector
connection_definitions = (YaDocsFileS3ApiConnectionDefinition,)
source_definitions = (YaDocsFileS3TableApiSourceDefinition,)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logging

from dl_connector_bundle_chs3.chs3_base.core.adapter import BaseAsyncFileS3Adapter
from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS


LOGGER = logging.getLogger(__name__)


class AsyncYaDocsFileS3Adapter(BaseAsyncFileS3Adapter):
conn_type = CONNECTION_TYPE_YADOCS
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import attr

from dl_connector_bundle_chs3.chs3_base.core.connection_executors import BaseFileS3AsyncAdapterConnExecutor
from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter


@attr.s(cmp=False, hash=False)
class YaDocsFileS3AsyncAdapterConnExecutor(BaseFileS3AsyncAdapterConnExecutor):
TARGET_ADAPTER_CLS = AsyncYaDocsFileS3Adapter # type: ignore # TODO: FIX
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from dl_connector_bundle_chs3.chs3_base.core.connector import (
BaseFileS3CoreConnectionDefinition,
BaseFileS3CoreConnector,
BaseFileS3TableCoreSourceDefinition,
)
from dl_connector_bundle_chs3.chs3_base.core.notifications import (
DataUpdateFailureNotification,
StaleDataNotification,
)
from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter
from dl_connector_bundle_chs3.chs3_yadocs.core.connection_executors import YaDocsFileS3AsyncAdapterConnExecutor
from dl_connector_bundle_chs3.chs3_yadocs.core.constants import (
CONNECTION_TYPE_YADOCS,
SOURCE_TYPE_YADOCS,
)
from dl_connector_bundle_chs3.chs3_yadocs.core.data_source import YaDocsFileS3DataSource
from dl_connector_bundle_chs3.chs3_yadocs.core.data_source_spec import YaDocsFileS3DataSourceSpec
from dl_connector_bundle_chs3.chs3_yadocs.core.lifecycle import YaDocsFileS3ConnectionLifecycleManager
from dl_connector_bundle_chs3.chs3_yadocs.core.settings import YaDocsFileS3SettingDefinition
from dl_connector_bundle_chs3.chs3_yadocs.core.storage_schemas.connection import YaDocsFileConnectionDataStorageSchema
from dl_connector_bundle_chs3.chs3_yadocs.core.storage_schemas.data_source_spec import (
YaDocsFileS3DataSourceSpecStorageSchema,
)
from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection
from dl_connector_clickhouse.core.clickhouse_base.adapters import ClickHouseAdapter


class YaDocsFileS3CoreConnectionDefinition(BaseFileS3CoreConnectionDefinition):
conn_type = CONNECTION_TYPE_YADOCS
connection_cls = YaDocsFileS3Connection
us_storage_schema_cls = YaDocsFileConnectionDataStorageSchema
sync_conn_executor_cls = YaDocsFileS3AsyncAdapterConnExecutor
async_conn_executor_cls = YaDocsFileS3AsyncAdapterConnExecutor
lifecycle_manager_cls = YaDocsFileS3ConnectionLifecycleManager
dialect_string = "bi_clickhouse"
settings_definition = YaDocsFileS3SettingDefinition


class YaDocsFileS3TableCoreSourceDefinition(BaseFileS3TableCoreSourceDefinition):
source_type = SOURCE_TYPE_YADOCS
source_cls = YaDocsFileS3DataSource
source_spec_cls = YaDocsFileS3DataSourceSpec
us_storage_schema_cls = YaDocsFileS3DataSourceSpecStorageSchema


class YaDocsFileS3CoreConnector(BaseFileS3CoreConnector):
connection_definitions = (YaDocsFileS3CoreConnectionDefinition,)
source_definitions = (YaDocsFileS3TableCoreSourceDefinition,)
rqe_adapter_classes = frozenset({ClickHouseAdapter, AsyncYaDocsFileS3Adapter})
notification_classes = (
StaleDataNotification,
DataUpdateFailureNotification,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dl_constants.enums import (
ConnectionType,
DataSourceType,
)


CONNECTION_TYPE_YADOCS = ConnectionType.declare("docs")
SOURCE_TYPE_YADOCS = DataSourceType.declare("YADOCS")
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import annotations

from dl_constants.enums import (
ComponentErrorLevel,
DataSourceType,
)
from dl_core import exc
from dl_core.reporting.notifications import get_notification_record

from dl_connector_bundle_chs3.chs3_base.core.constants import NOTIF_TYPE_DATA_UPDATE_FAILURE
from dl_connector_bundle_chs3.chs3_base.core.data_source import BaseFileS3DataSource
from dl_connector_bundle_chs3.chs3_yadocs.core.constants import (
CONNECTION_TYPE_YADOCS,
SOURCE_TYPE_YADOCS,
)


class YaDocsFileS3DataSource(BaseFileS3DataSource):
conn_type = CONNECTION_TYPE_YADOCS

@classmethod
def is_compatible_with_type(cls, source_type: DataSourceType) -> bool:
return source_type in {
SOURCE_TYPE_YADOCS,
}

def _handle_component_errors(self) -> None:
conn_src_id = self.origin_source_id
if conn_src_id is not None and (error_pack := self.connection.data.component_errors.get_pack(conn_src_id)):
single_error = error_pack.errors[0]

if single_error.level == ComponentErrorLevel.error:

class ThisDataSourceError(exc.DataSourceErrorFromComponentError):
err_code = exc.DataSourceErrorFromComponentError.err_code + single_error.code
default_message = single_error.message

raise ThisDataSourceError(
details=single_error.details,
)
else:
reporting_registry = self._get_connection().us_manager.get_services_registry().get_reporting_registry()
# this is the only case of connection component errors at the moment
# may be generalized in the future
reporting_registry.save_reporting_record(
get_notification_record(
NOTIF_TYPE_DATA_UPDATE_FAILURE,
err_code=".".join(single_error.code),
request_id=single_error.details.get("request-id"),
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import attr

from dl_connector_bundle_chs3.chs3_base.core.data_source_spec import BaseFileS3DataSourceSpec


@attr.s
class YaDocsFileS3DataSourceSpec(BaseFileS3DataSourceSpec):
pass
Loading
Loading