diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/source.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/source.py index 56de4c831..6cf3a94bd 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/source.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/api_schema/source.py @@ -12,4 +12,5 @@ class Meta(BaseFileSourceSchema.Meta): public_link = fields.String(dump_only=True) private_path = fields.String(dump_only=True) + sheet_id = fields.String(dump_only=True) first_line_is_header = fields.Boolean(dump_only=True) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connection_info.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connection_info.py index 572dc0f0f..11e18f5a3 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connection_info.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/api/connection_info.py @@ -6,4 +6,4 @@ class YaDocsFileS3ConnectionInfoProvider(ConnectionInfoProvider): - title_translatable = Translatable("label_connector-yadocs") + title_translatable = Translatable("label_connector-docs") diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/adapter.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/adapter.py index dc802a124..466f00e18 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/adapter.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/adapter.py @@ -1,11 +1,11 @@ import logging from dl_connector_bundle_chs3.chs3_base.core.adapter import BaseAsyncFileS3Adapter -from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_DOCS LOGGER = logging.getLogger(__name__) class AsyncYaDocsFileS3Adapter(BaseAsyncFileS3Adapter): - conn_type = CONNECTION_TYPE_YADOCS + conn_type = CONNECTION_TYPE_DOCS diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connector.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connector.py index d6daf9915..2d6bacf53 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connector.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/connector.py @@ -10,8 +10,8 @@ from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter from dl_connector_bundle_chs3.chs3_yadocs.core.connection_executors import YaDocsFileS3AsyncAdapterConnExecutor from dl_connector_bundle_chs3.chs3_yadocs.core.constants import ( - CONNECTION_TYPE_YADOCS, - SOURCE_TYPE_YADOCS, + CONNECTION_TYPE_DOCS, + SOURCE_TYPE_DOCS, ) from dl_connector_bundle_chs3.chs3_yadocs.core.data_source import YaDocsFileS3DataSource from dl_connector_bundle_chs3.chs3_yadocs.core.data_source_spec import YaDocsFileS3DataSourceSpec @@ -26,7 +26,7 @@ class YaDocsFileS3CoreConnectionDefinition(BaseFileS3CoreConnectionDefinition): - conn_type = CONNECTION_TYPE_YADOCS + conn_type = CONNECTION_TYPE_DOCS connection_cls = YaDocsFileS3Connection us_storage_schema_cls = YaDocsFileConnectionDataStorageSchema sync_conn_executor_cls = YaDocsFileS3AsyncAdapterConnExecutor @@ -37,7 +37,7 @@ class YaDocsFileS3CoreConnectionDefinition(BaseFileS3CoreConnectionDefinition): class YaDocsFileS3TableCoreSourceDefinition(BaseFileS3TableCoreSourceDefinition): - source_type = SOURCE_TYPE_YADOCS + source_type = SOURCE_TYPE_DOCS source_cls = YaDocsFileS3DataSource source_spec_cls = YaDocsFileS3DataSourceSpec us_storage_schema_cls = YaDocsFileS3DataSourceSpecStorageSchema diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/constants.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/constants.py index 0b3eae3c7..f05c5e9bf 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/constants.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/constants.py @@ -4,5 +4,5 @@ ) -CONNECTION_TYPE_YADOCS = ConnectionType.declare("docs") -SOURCE_TYPE_YADOCS = DataSourceType.declare("YADOCS") +CONNECTION_TYPE_DOCS = ConnectionType.declare("docs") +SOURCE_TYPE_DOCS = DataSourceType.declare("DOCS") diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source.py index 9e72d8187..9cdd3b6b7 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/data_source.py @@ -10,18 +10,18 @@ from dl_connector_bundle_chs3.chs3_base.core.constants import NOTIF_TYPE_DATA_UPDATE_FAILURE from dl_connector_bundle_chs3.chs3_base.core.data_source import BaseFileS3DataSource from dl_connector_bundle_chs3.chs3_yadocs.core.constants import ( - CONNECTION_TYPE_YADOCS, - SOURCE_TYPE_YADOCS, + CONNECTION_TYPE_DOCS, + SOURCE_TYPE_DOCS, ) class YaDocsFileS3DataSource(BaseFileS3DataSource): - conn_type = CONNECTION_TYPE_YADOCS + conn_type = CONNECTION_TYPE_DOCS @classmethod def is_compatible_with_type(cls, source_type: DataSourceType) -> bool: return source_type in { - SOURCE_TYPE_YADOCS, + SOURCE_TYPE_DOCS, } def _handle_component_errors(self) -> None: diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/settings.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/settings.py index 58c2053a7..b855b161c 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/settings.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/settings.py @@ -23,7 +23,7 @@ def yadocs_file_s3_settings_fallback(full_cfg: ConnectorsConfigType) -> dict[str if cfg is None: return {} return dict( - YADOCS=FileS3ConnectorSettings( # type: ignore + DOCS=FileS3ConnectorSettings( # type: ignore HOST=cfg.CONN_FILE_CH_HOST, PORT=cfg.CONN_FILE_CH_PORT, USERNAME=cfg.CONN_FILE_CH_USERNAME, diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/connection.py index 4b68027ea..81aaa890b 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/connection.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/testing/connection.py @@ -5,7 +5,7 @@ from dl_core.us_manager.us_manager_sync import SyncUSManager -from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_DOCS from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection @@ -14,7 +14,7 @@ def make_saved_yadocs_connection( sources: list[YaDocsFileS3Connection.FileDataSource], **kwargs: Any, ) -> YaDocsFileS3Connection: - conn_type = CONNECTION_TYPE_YADOCS + conn_type = CONNECTION_TYPE_DOCS conn_name = "{} test conn {}".format(conn_type.name, uuid.uuid4()) conn = YaDocsFileS3Connection.create_from_dict( @@ -22,7 +22,7 @@ def make_saved_yadocs_connection( sources=sources, ), ds_key=conn_name, - type_=CONNECTION_TYPE_YADOCS.name, + type_=CONNECTION_TYPE_DOCS.name, us_manager=sync_usm, **kwargs, ) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/us_connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/us_connection.py index 8122c876f..e50aa0cd3 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/us_connection.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/chs3_yadocs/core/us_connection.py @@ -17,21 +17,22 @@ from dl_utils.utils import DataKey from dl_connector_bundle_chs3.chs3_base.core.us_connection import BaseFileS3Connection -from dl_connector_bundle_chs3.chs3_yadocs.core.constants import SOURCE_TYPE_YADOCS +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import SOURCE_TYPE_DOCS LOGGER = logging.getLogger(__name__) class YaDocsFileS3Connection(BaseFileS3Connection): - source_type = SOURCE_TYPE_YADOCS - allowed_source_types = frozenset((SOURCE_TYPE_YADOCS,)) + source_type = SOURCE_TYPE_DOCS + allowed_source_types = frozenset((SOURCE_TYPE_DOCS,)) editable_data_source_parameters: ClassVar[ tuple[str, ...] ] = BaseFileS3Connection.editable_data_source_parameters + ( "public_link", "private_path", + "sheet_id", "first_line_is_header", "data_updated_at", ) @@ -40,6 +41,7 @@ class YaDocsFileS3Connection(BaseFileS3Connection): class FileDataSource(BaseFileS3Connection.FileDataSource): public_link: Optional[str] = attr.ib(default=None) private_path: Optional[str] = attr.ib(default=None) + sheet_id: Optional[str] = attr.ib(default=None) first_line_is_header: Optional[bool] = attr.ib(default=None) data_updated_at: datetime.datetime = attr.ib(factory=lambda: datetime.datetime.now(datetime.timezone.utc)) @@ -54,6 +56,7 @@ def __hash__(self) -> int: self.s3_filename_suffix, raw_schema, self.status, + self.sheet_id, self.public_link, self.private_path, ) @@ -65,6 +68,7 @@ def str_for_hash(self) -> str: super().str_for_hash(), str(self.public_link), str(self.private_path), + str(self.sheet_id), ] ) @@ -77,6 +81,7 @@ def get_desc(self) -> YaDocsFileSourceDesc: preview_id=self.preview_id, public_link=self.public_link, private_path=self.private_path, + sheet_id=self.sheet_id, first_line_is_header=self.first_line_is_header, ) @@ -124,6 +129,7 @@ def restore_source_params_from_orig(self, src_id: str, original_version: BaseFil first_line_is_header=orig_src.first_line_is_header, public_link=orig_src.public_link, private_path=orig_src.private_path, + sheet_id=orig_src.sheet_id, ) @property diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.mo b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.mo index 95b68acf0..a3726e21d 100644 Binary files a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.mo and b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.mo differ diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po index 7a89240cb..05755b8e5 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/en/LC_MESSAGES/dl_connector_bundle_chs3.po @@ -4,7 +4,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: datalens-opensource@yandex-team.ru\n" -"POT-Creation-Date: 2023-11-09 14:12+0000\n" +"POT-Creation-Date: 2023-09-22 08:16+0000\n" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=UTF-8\n" "Content-Transfer-Encoding: 8bit\n" @@ -12,7 +12,7 @@ msgstr "" msgid "label_connector-gsheets_v2" msgstr "Google Sheets" -msgid "label_connector-yadocs" +msgid "label_connector-docs" msgstr "Yandex Documents" msgid "label_connector-file" diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.mo b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.mo index 6ddb57b43..631622cdd 100644 Binary files a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.mo and b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.mo differ diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po index aa61f7320..e9adc3287 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3/locales/ru/LC_MESSAGES/dl_connector_bundle_chs3.po @@ -4,7 +4,7 @@ msgid "" msgstr "" "Project-Id-Version: PACKAGE VERSION\n" "Report-Msgid-Bugs-To: datalens-opensource@yandex-team.ru\n" -"POT-Creation-Date: 2023-11-09 14:12+0000\n" +"POT-Creation-Date: 2023-09-22 08:16+0000\n" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=UTF-8\n" "Content-Transfer-Encoding: 8bit\n" @@ -12,7 +12,7 @@ msgstr "" msgid "label_connector-gsheets_v2" msgstr "Google Sheets" -msgid "label_connector-yadocs" +msgid "label_connector-docs" msgstr "Яндекс Документы" msgid "label_connector-file" diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py index df42bcf2d..d578d72da 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/config.py @@ -11,7 +11,7 @@ host_us_pg=get_test_container_hostport("pg-us", fallback_port=52610).host, port_us_pg_5432=get_test_container_hostport("pg-us", fallback_port=52610).port, us_master_token="AC1ofiek8coB", - core_connector_ep_names=["clickhouse", "file", "gsheets_v2", "yadocs"], + core_connector_ep_names=["clickhouse", "file", "gsheets_v2", "docs"], redis_host=get_test_container_hostport("redis", fallback_port=52604).host, redis_port=get_test_container_hostport("redis", fallback_port=52604).port, redis_password="AwockEuvavDyinmeakmiRiopanbesBepsensUrdIz5", @@ -37,7 +37,7 @@ S3_ENDPOINT_URL = f"http://{get_test_container_hostport('s3-storage', fallback_port=52620).as_pair()}" API_TEST_CONFIG = ApiTestEnvironmentConfiguration( - api_connector_ep_names=["clickhouse", "file", "gsheets_v2", "yadocs"], + api_connector_ep_names=["clickhouse", "file", "gsheets_v2", "docs"], core_test_config=CORE_TEST_CONFIG, ext_query_executer_secret_key="_some_test_secret_key_", ) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/__init__.py similarity index 100% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/__init__.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/__init__.py diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/__init__.py similarity index 100% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/__init__.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/__init__.py diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/base.py similarity index 96% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/base.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/base.py index 13d2c8886..f0d67928d 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/base.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/base.py @@ -11,7 +11,7 @@ from dl_connector_bundle_chs3_tests.db.base.api.base import CHS3ConnectionApiTestBase from dl_connector_bundle_chs3_tests.db.base.api.data import CHS3DataApiTestBase from dl_connector_bundle_chs3_tests.db.base.api.dataset import CHS3DatasetTestBase -from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass +from dl_connector_bundle_chs3_tests.db.docs.core.base import BaseYaDocsFileS3TestClass LOGGER = logging.getLogger(__name__) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_connection.py similarity index 97% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_connection.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_connection.py index 9abaf2529..28d7bba7d 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_connection.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_connection.py @@ -10,7 +10,7 @@ from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection from dl_connector_bundle_chs3_tests.db.base.api.connection import CHS3ConnectionTestSuite -from dl_connector_bundle_chs3_tests.db.yadocs.api.base import YaDocsFileS3ApiConnectionTestBase +from dl_connector_bundle_chs3_tests.db.docs.api.base import YaDocsFileS3ApiConnectionTestBase class TestYaDocsFileS3Connection(YaDocsFileS3ApiConnectionTestBase, CHS3ConnectionTestSuite[YaDocsFileS3Connection]): diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_data.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_data.py similarity index 98% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_data.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_data.py index 93abd67e2..909df06a5 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_data.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_data.py @@ -27,7 +27,7 @@ from dl_connector_bundle_chs3.chs3_yadocs.core.lifecycle import YaDocsFileS3ConnectionLifecycleManager from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection from dl_connector_bundle_chs3_tests.db.base.api.data import CHS3DataResultTestSuite -from dl_connector_bundle_chs3_tests.db.yadocs.api.base import YaDocsFileS3DataApiTestBase +from dl_connector_bundle_chs3_tests.db.docs.api.base import YaDocsFileS3DataApiTestBase class TestYaDocsFileS3DataResult(YaDocsFileS3DataApiTestBase, CHS3DataResultTestSuite): diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_dataset.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_dataset.py similarity index 66% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_dataset.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_dataset.py index b18d403df..141332b62 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/api/test_dataset.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/api/test_dataset.py @@ -1,5 +1,5 @@ from dl_connector_bundle_chs3_tests.db.base.api.dataset import CHS3DatasetTestSuite -from dl_connector_bundle_chs3_tests.db.yadocs.api.base import YaDocsFileS3DatasetTestBase +from dl_connector_bundle_chs3_tests.db.docs.api.base import YaDocsFileS3DatasetTestBase class TestYaDocsFileS3Dataset(YaDocsFileS3DatasetTestBase, CHS3DatasetTestSuite): diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/__init__.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/__init__.py similarity index 100% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/__init__.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/__init__.py diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/base.py similarity index 92% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/base.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/base.py index cdf62c464..d9db84000 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/base.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/base.py @@ -7,8 +7,8 @@ from dl_core_testing.fixtures.primitives import FixtureTableSpec from dl_connector_bundle_chs3.chs3_yadocs.core.constants import ( - CONNECTION_TYPE_YADOCS, - SOURCE_TYPE_YADOCS, + CONNECTION_TYPE_DOCS, + SOURCE_TYPE_DOCS, ) from dl_connector_bundle_chs3.chs3_yadocs.core.testing.connection import make_saved_yadocs_connection from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection @@ -16,8 +16,8 @@ class BaseYaDocsFileS3TestClass(BaseCHS3TestClass[YaDocsFileS3Connection]): - conn_type = CONNECTION_TYPE_YADOCS - source_type = SOURCE_TYPE_YADOCS + conn_type = CONNECTION_TYPE_DOCS + source_type = SOURCE_TYPE_DOCS @pytest.fixture(scope="function") def sample_file_data_source( diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_adapter.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_adapter.py similarity index 82% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_adapter.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_adapter.py index c222b1ca4..965f96345 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_adapter.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_adapter.py @@ -2,7 +2,7 @@ from dl_connector_bundle_chs3.chs3_base.core.target_dto import BaseFileS3ConnTargetDTO from dl_connector_bundle_chs3.chs3_yadocs.core.adapter import AsyncYaDocsFileS3Adapter -from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass +from dl_connector_bundle_chs3_tests.db.docs.core.base import BaseYaDocsFileS3TestClass class TestAsyncYaDocsFileS3Adapter( diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_connection.py similarity index 78% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_connection.py index b7b7b5161..740d8abe9 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_connection.py @@ -2,7 +2,7 @@ from dl_connector_bundle_chs3.file.core.us_connection import FileS3Connection from dl_connector_bundle_chs3_tests.db.base.core.connection import CHS3ConnectionTestBase -from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass +from dl_connector_bundle_chs3_tests.db.docs.core.base import BaseYaDocsFileS3TestClass class TestYaDocsFileS3Connection(BaseYaDocsFileS3TestClass, CHS3ConnectionTestBase[FileS3Connection]): diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection_executor.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_connection_executor.py similarity index 85% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection_executor.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_connection_executor.py index 746ab02b5..dd983dd01 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_connection_executor.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_connection_executor.py @@ -3,7 +3,7 @@ CHS3AsyncConnectionExecutorTestBase, CHS3SyncConnectionExecutorTestBase, ) -from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass +from dl_connector_bundle_chs3_tests.db.docs.core.base import BaseYaDocsFileS3TestClass class TestYaDocsFileS3SyncConnectionExecutor( diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_data_source.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_data_source.py similarity index 87% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_data_source.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_data_source.py index 81939f7fe..e4d9650e9 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_data_source.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_data_source.py @@ -2,7 +2,7 @@ from dl_connector_bundle_chs3.chs3_yadocs.core.data_source_spec import YaDocsFileS3DataSourceSpec from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection from dl_connector_bundle_chs3_tests.db.base.core.data_source import CHS3TableDataSourceTestBase -from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass +from dl_connector_bundle_chs3_tests.db.docs.core.base import BaseYaDocsFileS3TestClass class TestYaDocsFileS3TableDataSource( diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_dataset.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_dataset.py similarity index 76% rename from lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_dataset.py rename to lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_dataset.py index 71912c931..c613aebec 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/yadocs/core/test_dataset.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/docs/core/test_dataset.py @@ -1,6 +1,6 @@ from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection from dl_connector_bundle_chs3_tests.db.base.core.dataset import CHS3DatasetTestBase -from dl_connector_bundle_chs3_tests.db.yadocs.core.base import BaseYaDocsFileS3TestClass +from dl_connector_bundle_chs3_tests.db.docs.core.base import BaseYaDocsFileS3TestClass class TestYaDocsFileS3Dataset(BaseYaDocsFileS3TestClass, CHS3DatasetTestBase[YaDocsFileS3Connection]): diff --git a/lib/dl_connector_bundle_chs3/pyproject.toml b/lib/dl_connector_bundle_chs3/pyproject.toml index 85d82504d..6f3029063 100644 --- a/lib/dl_connector_bundle_chs3/pyproject.toml +++ b/lib/dl_connector_bundle_chs3/pyproject.toml @@ -34,12 +34,12 @@ datalens-task-processor = {path = "../dl_task_processor"} [tool.poetry.plugins."dl_api_lib.connectors"] file = "dl_connector_bundle_chs3.file.api.connector:FileS3ApiConnector" gsheets_v2 = "dl_connector_bundle_chs3.chs3_gsheets.api.connector:GSheetsFileS3ApiConnector" -yadocs = "dl_connector_bundle_chs3.chs3_yadocs.api.connector:YaDocsFileS3ApiConnector" +docs = "dl_connector_bundle_chs3.chs3_yadocs.api.connector:YaDocsFileS3ApiConnector" [tool.poetry.plugins."dl_core.connectors"] file = "dl_connector_bundle_chs3.file.core.connector:FileS3CoreConnector" gsheets_v2 = "dl_connector_bundle_chs3.chs3_gsheets.core.connector:GSheetsFileS3CoreConnector" -yadocs = "dl_connector_bundle_chs3.chs3_yadocs.core.connector:YaDocsFileS3CoreConnector" +docs = "dl_connector_bundle_chs3.chs3_yadocs.core.connector:YaDocsFileS3CoreConnector" [tool.poetry.group.tests.dependencies] pytest = ">=7.2.2" @@ -63,9 +63,9 @@ target_path = "file" root_dir = "dl_connector_bundle_chs3_tests/db" target_path = "gsheets_v2" -[datalens.pytest.db_yadocs] +[datalens.pytest.db_docs] root_dir = "dl_connector_bundle_chs3_tests/db" -target_path = "yadocs" +target_path = "docs" [tool.mypy] warn_unused_configs = true diff --git a/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py b/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py index 41c8e0c21..53bb0f117 100644 --- a/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py +++ b/lib/dl_core/dl_core/services_registry/file_uploader_client_factory.py @@ -54,6 +54,7 @@ class YaDocsFileSourceDesc(FileSourceDesc): public_link: Optional[str] = attr.ib() private_path: Optional[str] = attr.ib() first_line_is_header: Optional[bool] = attr.ib() + sheet_id: Optional[str] = attr.ib() @attr.s(frozen=True) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py index 705cc5e28..9dc3e32e7 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/files.py @@ -1,8 +1,12 @@ from __future__ import annotations -from typing import Any +from typing import ( + Any, + Type, +) import marshmallow as ma +from marshmallow_oneofschema import OneOfSchema from dl_constants.enums import FileProcessingStatus from dl_file_uploader_api_lib.schemas.base import BaseRequestSchema @@ -14,7 +18,7 @@ from dl_file_uploader_lib.enums import FileType -def validate_authorized(data: dict) -> None: +def validate_authorized_gsheets(data: dict) -> None: if data["authorized"] and data["refresh_token"] is None and data["connection_id"] is None: raise ma.ValidationError( "Either refresh_token or connection_id must be provided when authorized is true", @@ -22,14 +26,19 @@ def validate_authorized(data: dict) -> None: ) +def validate_authorized_yadocs(data: dict) -> None: + if data["authorized"] and data["oauth_token"] is None and data["connection_id"] is None: + raise ma.ValidationError( + "Either oauth_token or connection_id must be provided when authorized is true", + "authorized", + ) + + def validate_docs_data(data): if not ((data["public_link"] is None) ^ (data["private_path"] is None)): raise ValueError("Expected exactly one of [`private_path`, `public_link`] to be specified") - if data["public_link"] is None: - if data["private_path"] is None: - raise ma.ValidationError("'private_path' must be provided for private files") - elif data["oauth_token"] is None and data["connection_id"] is None: - raise ma.ValidationError("Expected `oauth_token` or `connection_id` to be specified") + if data["public_link"] is None and data["oauth_token"] is None and data["connection_id"] is None: + raise ma.ValidationError("Expected `oauth_token` or `connection_id` to be specified") class FileLinkRequestSchema(BaseRequestSchema): @@ -41,7 +50,7 @@ class FileLinkRequestSchema(BaseRequestSchema): @ma.validates_schema(skip_on_field_errors=True) def validate_object(self, data: dict, **kwargs: Any) -> None: - validate_authorized(data) + validate_authorized_gsheets(data) class FileDocumentsRequestSchema(BaseRequestSchema): @@ -90,24 +99,59 @@ class FileSourcesResultSchema(ma.Schema): sources = ma.fields.Nested(SourceShortInfoSchema, many=True) -class UpdateConnectionDataRequestSchema(BaseRequestSchema): - class UpdateConnectionDataSourceSchema(BaseRequestSchema): - id = ma.fields.String() - title = ma.fields.String() - spreadsheet_id = ma.fields.String(load_default=None) - sheet_id = ma.fields.Integer(load_default=None) - first_line_is_header = ma.fields.Boolean(load_default=None) +class FileTypeOneOfSchema(OneOfSchema): + class Meta: + unknown = ma.EXCLUDE + + type_field_remove = False + + def get_obj_type(self, obj: dict[str, Any]) -> str: + type_field = obj[self.type_field] if isinstance(obj, dict) else getattr(obj, self.type_field) + assert isinstance(type_field, FileType) + return type_field.name + + def get_data_type(self, data): + data_type = data.get(self.type_field) + if self.type_field not in data: + data[self.type_field] = FileType.gsheets.value + data_type = FileType.gsheets.value + if self.type_field in data and self.type_field_remove: + data.pop(self.type_field) + return data_type + + +class UpdateConnectionDataSourceSchemaBase(BaseRequestSchema): + id = ma.fields.String() + title = ma.fields.String() + first_line_is_header = ma.fields.Boolean(load_default=None) + + +class UpdateConnectionDataSourceSchemaGSheets(UpdateConnectionDataSourceSchemaBase): + spreadsheet_id = ma.fields.String(load_default=None) + sheet_id = ma.fields.Integer(load_default=None) + + +class UpdateConnectionDataSourceSchemaYaDocs(UpdateConnectionDataSourceSchemaBase): + public_link = ma.fields.String(load_default=None) + private_path = ma.fields.String(load_default=None) + sheet_id = ma.fields.String(load_default=None) + +class UpdateConnectionDataRequestSchemaBase(BaseRequestSchema): + type = ma.fields.Enum(FileType) connection_id = ma.fields.String(allow_none=True, load_default=None) - refresh_token = ma.fields.String(allow_none=True, load_default=None) authorized = ma.fields.Boolean(required=True) save = ma.fields.Boolean(load_default=False) - sources = ma.fields.Nested(UpdateConnectionDataSourceSchema, many=True) tenant_id = ma.fields.String(allow_none=True, load_default=None) + +class UpdateConnectionDataRequestSchemaGSheets(UpdateConnectionDataRequestSchemaBase): + refresh_token = ma.fields.String(allow_none=True, load_default=None) + sources = ma.fields.Nested(UpdateConnectionDataSourceSchemaGSheets, many=True) + @ma.validates_schema(skip_on_field_errors=True) def validate_object(self, data: dict, **kwargs: Any) -> None: - validate_authorized(data) + validate_authorized_gsheets(data) incomplete_sources: list[dict[str, str]] = [] for src in data["sources"]: @@ -127,6 +171,43 @@ def validate_object(self, data: dict, **kwargs: Any) -> None: ) +class UpdateConnectionDataRequestSchemaYaDocs(UpdateConnectionDataRequestSchemaBase): + oauth_token = ma.fields.String(allow_none=True, load_default=None) + sources = ma.fields.Nested(UpdateConnectionDataSourceSchemaYaDocs, many=True) + + @ma.validates_schema(skip_on_field_errors=True) + def validate_object(self, data: dict, **kwargs: Any) -> None: + validate_authorized_yadocs(data) + + incomplete_sources: list[dict[str, str]] = [] + for src in data["sources"]: + if ( + (src["public_link"] is None and src["private_path"] is None) + or src["sheet_id"] is None + or src["first_line_is_header"] is None + ): + incomplete_sources.append( + dict( + source_id=src["id"], + title=src["title"], + ) + ) + + if incomplete_sources: + raise exc.CannotUpdateDataError( + details=dict( + incomplete_sources=incomplete_sources, + ) + ) + + +class UpdateConnectionDataRequestSchema(FileTypeOneOfSchema): + type_schemas: dict[str, Type[UpdateConnectionDataRequestSchemaBase]] = { + FileType.gsheets.name: UpdateConnectionDataRequestSchemaGSheets, + FileType.yadocs.name: UpdateConnectionDataRequestSchemaYaDocs, + } + + class UpdateConnectionDataResultSchema(ma.Schema): class FileSourcesSchema(ma.Schema): class SingleFileSourceSchema(ma.Schema): diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py index bd672d960..f93553777 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/schemas/sources.py @@ -102,12 +102,18 @@ class SourceInfoSchemaGSheets(SourceInfoSchemaBase): spreadsheet_id = ma.fields.String() +class SourceInfoSchemaYaDocs(SourceInfoSchemaBase): + sheet_id = ma.fields.String() + private_path = ma.fields.String(allow_none=True) + public_link = ma.fields.String(allow_none=True) + + class SourceInfoSchema(FileTypeOneOfSchema): type_schemas: dict[str, Type[SourceInfoSchemaBase]] = { FileType.csv.name: SourceInfoSchemaBase, FileType.gsheets.name: SourceInfoSchemaGSheets, FileType.xlsx.name: SourceInfoSchemaBase, - FileType.yadocs.name: SourceInfoSchemaBase, + FileType.yadocs.name: SourceInfoSchemaYaDocs, } diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py index 048de30d6..0eb895d0f 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib/views/files.py @@ -38,6 +38,9 @@ GSheetsFileSourceSettings, GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, + YaDocsFileSourceSettings, + YaDocsUserSourceDataSourceProperties, + YaDocsUserSourceProperties, ) from dl_file_uploader_task_interface.tasks import ( DownloadGSheetTask, @@ -242,7 +245,7 @@ async def post(self) -> web.StreamResponse: rmm = self.dl_request.get_redis_model_manager() redis = self.dl_request.get_persistent_redis() - dfile_by_spreadsheet: dict[str, DataFile] = {} + dfile_by_source_properties: dict[str, DataFile] = {} for src in sources: source_lock_key, source_lock_token = get_update_connection_source_lock(src["id"]) LOGGER.info(f"Acquiring redis lock {source_lock_key}") @@ -253,38 +256,78 @@ async def post(self) -> web.StreamResponse: continue LOGGER.info(f"Lock {source_lock_key} acquired") - if src["spreadsheet_id"] not in dfile_by_spreadsheet: - dfile_by_spreadsheet[src["spreadsheet_id"]] = DataFile( - user_id=DataFile.system_user_id if self._is_internal else None, # filled from rci when None - manager=rmm, - filename="TITLE", + if req_data["type"] == FileType.gsheets: + if src["spreadsheet_id"] not in dfile_by_source_properties: + dfile_by_source_properties[src["spreadsheet_id"]] = DataFile( + user_id=DataFile.system_user_id if self._is_internal else None, # filled from rci when None + manager=rmm, + filename="TITLE", + status=FileProcessingStatus.in_progress, + file_type=FileType.gsheets, + sources=[], + user_source_properties=GSheetsUserSourceProperties( + spreadsheet_id=src["spreadsheet_id"], + refresh_token=req_data["refresh_token"], + ), + ) + LOGGER.info(f'Data file id: {dfile_by_source_properties[src["spreadsheet_id"]].id}') + + sheet_data_source = DataSource( + id=src["id"], + title=src["title"], + raw_schema=[], + file_source_settings=GSheetsFileSourceSettings( + first_line_is_header=src["first_line_is_header"], + raw_schema_header=[], + raw_schema_body=[], + ), + user_source_dsrc_properties=GSheetsUserSourceDataSourceProperties( + sheet_id=src["sheet_id"], + ), status=FileProcessingStatus.in_progress, - file_type=FileType.gsheets, - sources=[], - user_source_properties=GSheetsUserSourceProperties( - spreadsheet_id=src["spreadsheet_id"], - refresh_token=req_data["refresh_token"], + error=None, + ) + + dfile_by_source_properties[src["spreadsheet_id"]].sources.append(sheet_data_source) # type: ignore + + elif req_data["type"] == FileType.yadocs: + filled_source_property = "public_link" if src["public_link"] is not None else "private_path" + + if (src["public_link"] is None or src["public_link"] not in dfile_by_source_properties) and ( + src["private_path"] is None or src["private_path"] not in dfile_by_source_properties + ): + dfile_by_source_properties[src[filled_source_property]] = DataFile( + user_id=DataFile.system_user_id if self._is_internal else None, # filled from rci when None + manager=rmm, + filename="TITLE", + status=FileProcessingStatus.in_progress, + file_type=FileType.yadocs, + sources=[], + user_source_properties=YaDocsUserSourceProperties( + private_path=src["private_path"], + public_link=src["public_link"], + oauth_token=req_data["oauth_token"], + ), + ) + LOGGER.info(f"Data file id: {dfile_by_source_properties[src[filled_source_property]].id}") + + sheet_data_source = DataSource( + id=src["id"], + title=src["title"], + raw_schema=[], + file_source_settings=YaDocsFileSourceSettings( + first_line_is_header=src["first_line_is_header"], + raw_schema_header=[], + raw_schema_body=[], + ), + user_source_dsrc_properties=YaDocsUserSourceDataSourceProperties( + sheet_id=src["sheet_id"], ), + status=FileProcessingStatus.in_progress, + error=None, ) - LOGGER.info(f'Data file id: {dfile_by_spreadsheet[src["spreadsheet_id"]].id}') - - sheet_data_source = DataSource( - id=src["id"], - title=src["title"], - raw_schema=[], - file_source_settings=GSheetsFileSourceSettings( - first_line_is_header=src["first_line_is_header"], - raw_schema_header=[], - raw_schema_body=[], - ), - user_source_dsrc_properties=GSheetsUserSourceDataSourceProperties( - sheet_id=src["sheet_id"], - ), - status=FileProcessingStatus.in_progress, - error=None, - ) - dfile_by_spreadsheet[src["spreadsheet_id"]].sources.append(sheet_data_source) # type: ignore + dfile_by_source_properties[src[filled_source_property]].sources.append(sheet_data_source) # type: ignore task_processor = self.dl_request.get_task_processor() exec_mode = TaskExecutionMode.UPDATE_AND_SAVE if req_data["save"] else TaskExecutionMode.UPDATE_NO_SAVE @@ -294,21 +337,32 @@ async def post(self) -> web.StreamResponse: assert self.dl_request.rci.tenant is not None tenant_id = self.dl_request.rci.tenant.get_tenant_id() assert tenant_id is not None - for dfile in dfile_by_spreadsheet.values(): + for dfile in dfile_by_source_properties.values(): await dfile.save() - download_gsheet_task = DownloadGSheetTask( - file_id=dfile.id, - authorized=req_data["authorized"], - tenant_id=tenant_id, - connection_id=req_data["connection_id"], - exec_mode=exec_mode, - ) - await task_processor.schedule(download_gsheet_task) - LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {dfile.id} (update connection)") + if req_data["type"] == FileType.gsheets: + download_gsheet_task = DownloadGSheetTask( + file_id=dfile.id, + authorized=req_data["authorized"], + tenant_id=tenant_id, + connection_id=req_data["connection_id"], + exec_mode=exec_mode, + ) + await task_processor.schedule(download_gsheet_task) + LOGGER.info(f"Scheduled DownloadGSheetTask for file_id {dfile.id} (update connection)") + elif req_data["type"] == FileType.yadocs: + download_yadocs_task = DownloadYaDocsTask( + file_id=dfile.id, + authorized=req_data["authorized"], + tenant_id=tenant_id, + connection_id=req_data["connection_id"], + exec_mode=exec_mode, + ) + await task_processor.schedule(download_yadocs_task) + LOGGER.info(f"Scheduled DownloadYaDocsTask for file_id {dfile.id} (update connection)") return web.json_response( - files_schemas.UpdateConnectionDataResultSchema().dump(dict(files=dfile_by_spreadsheet.values())) + files_schemas.UpdateConnectionDataResultSchema().dump(dict(files=dfile_by_source_properties.values())) ) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/config.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/config.py index b999a9d75..94d51d3bc 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/config.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/config.py @@ -6,7 +6,7 @@ ) -CONNECTOR_WHITELIST = ["clickhouse", "file", "gsheets_v2"] +CONNECTOR_WHITELIST = ["clickhouse", "file", "gsheets_v2", "docs"] @attr.s(kw_only=True) diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_update_data.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_update_data_gsheets.py similarity index 100% rename from lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_update_data.py rename to lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_update_data_gsheets.py diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_update_data_yadocs.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_update_data_yadocs.py new file mode 100644 index 000000000..058dec1ac --- /dev/null +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/ext/test_update_data_yadocs.py @@ -0,0 +1,340 @@ +import asyncio +import datetime +import itertools +import logging +import uuid + +import pytest + +from dl_constants.enums import ( + FileProcessingStatus, + UserDataType, +) +from dl_core.db import SchemaColumn +from dl_core.us_manager.us_manager_async import AsyncUSManager +from dl_core_testing.connection import make_conn_key +from dl_file_uploader_api_lib_tests.req_builder import ReqBuilder +from dl_file_uploader_lib import exc +from dl_testing.s3_utils import s3_file_exists + +from dl_connector_bundle_chs3.chs3_gsheets.core.constants import CONNECTION_TYPE_GSHEETS_V2 +from dl_connector_bundle_chs3.chs3_gsheets.core.lifecycle import GSheetsFileS3ConnectionLifecycleManager +from dl_connector_bundle_chs3.chs3_gsheets.core.us_connection import GSheetsFileS3Connection +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_DOCS +from dl_connector_bundle_chs3.chs3_yadocs.core.lifecycle import YaDocsFileS3ConnectionLifecycleManager +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection + + +LOGGER = logging.getLogger(__name__) + + +@pytest.fixture(scope="function") +async def saved_yadocs_connection(loop, bi_context, default_async_usm_per_test, s3_persistent_bucket, s3_client): + us_manager = default_async_usm_per_test + conn_name = "docs test conn {}".format(uuid.uuid4()) + long_long_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta( + seconds=YaDocsFileS3ConnectionLifecycleManager.STALE_THRESHOLD_SECONDS + 60, # just in case + ) + + dummy_raw_schema = [SchemaColumn("dummy_column", user_type=UserDataType.string)] + data = YaDocsFileS3Connection.DataModel( + sources=[ + YaDocsFileS3Connection.FileDataSource( # this is a valid source + id=f"source_1_{uuid.uuid4()}", + file_id=str(uuid.uuid4()), + title="Source 1", + status=FileProcessingStatus.ready, + s3_filename=f"src_1_filename_{uuid.uuid4()}", + raw_schema=dummy_raw_schema, + public_link="https://disk.yandex.ru/i/zr3TzLtWlTjYnw", + sheet_id="elaborate", + first_line_is_header=True, + data_updated_at=long_long_ago, + ), + YaDocsFileS3Connection.FileDataSource( # this sheet has no data + id=f"source_2_{uuid.uuid4()}", + file_id=str(uuid.uuid4()), + title="Source 2", + status=FileProcessingStatus.ready, + s3_filename=f"src_2_filename_{uuid.uuid4()}", + raw_schema=dummy_raw_schema, + public_link="https://disk.yandex.ru/i/zr3TzLtWlTjYnw", + sheet_id="image", + first_line_is_header=True, + data_updated_at=long_long_ago, + ), + YaDocsFileS3Connection.FileDataSource( # this sheet does not exist + id=f"source_3_{uuid.uuid4()}", + file_id=str(uuid.uuid4()), + title="Source 3", + status=FileProcessingStatus.ready, + s3_filename=f"src_3_filename_{uuid.uuid4()}", + raw_schema=dummy_raw_schema, + public_link="https://disk.yandex.ru/i/zr3TzLtWlTjYnw", + sheet_id="hello, world", + first_line_is_header=True, + data_updated_at=long_long_ago, + ), + YaDocsFileS3Connection.FileDataSource( # this sheet's whole spreadsheet does not exist + id=f"source_4_{uuid.uuid4()}", + file_id=str(uuid.uuid4()), + title="Source 4", + status=FileProcessingStatus.ready, + s3_filename=f"src_4_filename_{uuid.uuid4()}", + raw_schema=dummy_raw_schema, + public_link="https://disk.yandex.ru/i/1nxUuqeIoBvihQvvvv", + sheet_id="99999999999", + first_line_is_header=True, + data_updated_at=long_long_ago, + ), + YaDocsFileS3Connection.FileDataSource( # this is a valid source, but it failed during the previous update + id=f"source_5_{uuid.uuid4()}", + file_id=str(uuid.uuid4()), + title="Source 5", + status=FileProcessingStatus.failed, + s3_filename=f"src_5_filename_{uuid.uuid4()}", # removed below after file upload + raw_schema=dummy_raw_schema, + public_link="https://disk.yandex.ru/i/zr3TzLtWlTjYnw", + sheet_id="elaborate", + first_line_is_header=True, + data_updated_at=long_long_ago, + ), + YaDocsFileS3Connection.FileDataSource( # no access + id=f"source_6_{uuid.uuid4()}", + file_id=str(uuid.uuid4()), + title="Source 6", + status=FileProcessingStatus.ready, + s3_filename=f"src_6_filename_{uuid.uuid4()}", + raw_schema=dummy_raw_schema, + public_link="https://disk.yandex.ru/i/1nxUuqeIoBvihQ", + sheet_id="0", + first_line_is_header=True, + data_updated_at=long_long_ago, + ), + ] + ) + conn = YaDocsFileS3Connection.create_from_dict( + data, + ds_key=make_conn_key("connections", conn_name), + type_=CONNECTION_TYPE_DOCS.name, + meta={"title": conn_name, "state": "saved"}, + us_manager=us_manager, + ) + await us_manager.save(conn) + + csv_data = "f1,f2,f3\nqwe,123,45.9\nasd,345,47.9".encode("utf-8") + for src in conn.data.sources: + await s3_client.put_object( + ACL="private", + Bucket=s3_persistent_bucket, + Key=src.s3_filename, + Body=csv_data, + ) + + for src in conn.data.sources: # make sure all files are intact before the update + assert await s3_file_exists(s3_client, s3_persistent_bucket, src.s3_filename) + + # conn.data.sources[4].s3_filename = None + await us_manager.save(conn) + + yield conn + + updated_conn: YaDocsFileS3Connection = await us_manager.get_by_id(conn.uuid, expected_type=YaDocsFileS3Connection) + for src in itertools.chain( + conn.data.sources, updated_conn.data.sources + ): # cleanup original and updated files if any + if src.s3_filename is not None: + await s3_client.delete_object( + Bucket=s3_persistent_bucket, + Key=src.s3_filename, + ) + await us_manager.delete(conn) + + +@pytest.mark.asyncio +async def test_update_connection_data_with_save( + fu_client, + redis_cli, + default_async_usm_per_test: AsyncUSManager, + s3_client, + s3_tmp_bucket, + s3_persistent_bucket, + saved_yadocs_connection: YaDocsFileS3Connection, + reader_app, +): + conn = saved_yadocs_connection + usm = default_async_usm_per_test + data_updated_at_orig = conn.data.oldest_data_update_time(exclude_statuses={FileProcessingStatus.in_progress}) + + resp = await fu_client.make_request(ReqBuilder.update_conn_data(conn, save=True, file_type="yadocs")) + assert resp.status == 200 + + await asyncio.sleep(5) + + updated_conn: YaDocsFileS3Connection = await usm.get_by_id(conn.uuid, expected_type=YaDocsFileS3Connection) + + new_data_updated_at = updated_conn.data.oldest_data_update_time(exclude_statuses={FileProcessingStatus.in_progress}) + assert data_updated_at_orig != new_data_updated_at + + src_1 = updated_conn.get_file_source_by_id(conn.data.sources[0].id) # this is a valid source + assert src_1.file_id != conn.data.sources[0].file_id + assert await s3_file_exists(s3_client, s3_persistent_bucket, conn.data.sources[0].s3_filename) is False + assert await s3_file_exists(s3_client, s3_persistent_bucket, updated_conn.data.sources[0].s3_filename) is True + assert src_1.status == FileProcessingStatus.ready + assert src_1.data_updated_at != data_updated_at_orig + assert src_1.raw_schema is not None + + src_2 = updated_conn.get_file_source_by_id(conn.data.sources[1].id) # this sheet has no data + assert src_2.file_id == conn.data.sources[1].file_id + assert updated_conn.data.sources[1].s3_filename is None + assert await s3_file_exists(s3_client, s3_persistent_bucket, conn.data.sources[1].s3_filename) is False + assert src_2.status == FileProcessingStatus.failed + assert src_2.data_updated_at != data_updated_at_orig + assert src_2.raw_schema is not None + + src_3 = updated_conn.get_file_source_by_id(conn.data.sources[2].id) # this sheet does not exist + assert src_3.file_id == conn.data.sources[2].file_id + assert updated_conn.data.sources[2].s3_filename is None + assert await s3_file_exists(s3_client, s3_persistent_bucket, conn.data.sources[2].s3_filename) is False + assert src_3.status == FileProcessingStatus.failed + assert src_3.data_updated_at != data_updated_at_orig + assert src_3.raw_schema is not None + + src_4 = updated_conn.get_file_source_by_id(conn.data.sources[3].id) # this sheet's whole spreadsheet does not exist + assert src_4.file_id == conn.data.sources[3].file_id + assert updated_conn.data.sources[3].s3_filename is None + assert await s3_file_exists(s3_client, s3_persistent_bucket, conn.data.sources[3].s3_filename) is False + assert src_4.status == FileProcessingStatus.failed + assert src_4.data_updated_at != data_updated_at_orig + assert src_4.raw_schema is not None + + src_5 = updated_conn.get_file_source_by_id( + conn.data.sources[4].id + ) # this is a valid source, but it failed during the previous update + assert src_5.status == FileProcessingStatus.ready + assert src_5.data_updated_at != data_updated_at_orig + assert src_5.raw_schema is not None + + src_6 = updated_conn.get_file_source_by_id(conn.data.sources[5].id) # no access + assert src_6.file_id == conn.data.sources[5].file_id + assert updated_conn.data.sources[5].s3_filename is None + assert await s3_file_exists(s3_client, s3_persistent_bucket, conn.data.sources[5].s3_filename) is False + assert src_6.status == FileProcessingStatus.failed + assert src_6.data_updated_at != data_updated_at_orig + assert src_6.raw_schema is not None + + error_registry = updated_conn.data.component_errors + assert len(error_registry.items) == 4 + for err_pack in error_registry.items: + assert all("request-id" in err.details for err in err_pack.errors) + + +@pytest.mark.asyncio +async def test_update_connection_data_without_save( + fu_client, + redis_cli, + s3_tmp_bucket, + s3_persistent_bucket, + saved_yadocs_connection: YaDocsFileS3Connection, + reader_app, +): + conn = saved_yadocs_connection + resp = await fu_client.make_request(ReqBuilder.update_conn_data(conn, save=False, file_type="yadocs")) + assert resp.status == 200 + + file_ids = [file["file_id"] for file in resp.json["files"]] + assert len(file_ids) == 3 + + for file_id in file_ids: + resp = await fu_client.make_request(ReqBuilder.file_status(file_id)) + assert resp.status == 200 + assert resp.json["file_id"] == file_id + + await asyncio.sleep(5) + + # spreadsheet I + resp = await fu_client.make_request(ReqBuilder.file_status(file_ids[0])) + assert resp.status == 200 + assert resp.json["file_id"] == file_ids[0] + assert resp.json["status"] == "ready" + + resp = await fu_client.make_request(ReqBuilder.file_sources(file_ids[0])) + sources = resp.json["sources"] + assert sources[0]["is_applicable"] + assert sources[0]["error"] is None + + assert not sources[1]["is_applicable"] + assert sources[1]["error"]["code"] == "ERR.FILE.NO_DATA" + assert "request-id" in sources[1]["error"]["details"] + + assert not sources[2]["is_applicable"] + assert sources[2]["error"]["code"] == "ERR.FILE.NOT_FOUND" + + assert sources[3]["is_applicable"] + assert sources[3]["error"] is None + + # spreadsheet II + resp = await fu_client.make_request(ReqBuilder.file_status(file_ids[1])) + assert resp.status == 200 + assert resp.json["file_id"] == file_ids[1] + assert resp.json["status"] == "failed" + + resp = await fu_client.make_request(ReqBuilder.file_sources(file_ids[1])) + sources = resp.json["sources"] + assert not sources[0]["is_applicable"] + assert sources[0]["error"]["code"] == "ERR.FILE.NOT_FOUND" + + resp = await fu_client.make_request(ReqBuilder.source_status(file_ids[1], sources[0]["source_id"])) + assert resp.json["file_id"] == file_ids[1] + assert resp.json["status"] == "failed" + assert resp.json["error"]["code"] == "ERR.FILE.NOT_FOUND" + + # spreadsheet III + resp = await fu_client.make_request(ReqBuilder.file_status(file_ids[2])) + assert resp.status == 200 + assert resp.json["file_id"] == file_ids[2] + assert resp.json["status"] == "failed" + + resp = await fu_client.make_request(ReqBuilder.file_sources(file_ids[2])) + sources = resp.json["sources"] + assert not sources[0]["is_applicable"] + assert sources[0]["error"]["code"] == "ERR.FILE.NOT_FOUND" + + resp = await fu_client.make_request(ReqBuilder.source_status(file_ids[2], sources[0]["source_id"])) + assert resp.json["file_id"] == file_ids[2] + assert resp.json["status"] == "failed" + assert resp.json["error"]["code"] == "ERR.FILE.NOT_FOUND" + + +@pytest.mark.asyncio +async def test_update_in_progress_sources( + fu_client, + default_async_usm_per_test: AsyncUSManager, + redis_cli, + s3_tmp_bucket, + s3_persistent_bucket, + saved_yadocs_connection: YaDocsFileS3Connection, + reader_app, +): + usm = default_async_usm_per_test + conn = saved_yadocs_connection + + # making the source incomplete + conn.data.sources[0].sheet_id = None + conn.data.sources[0].status = FileProcessingStatus.in_progress + await usm.save(conn) + + resp = await fu_client.make_request( + ReqBuilder.update_conn_data( + conn, + save=False, + require_ok=False, + file_type="yadocs", + ) + ) + assert resp.status == 400 + + assert resp.json["message"] == exc.CannotUpdateDataError.default_message + assert resp.json["details"] == { + "incomplete_sources": [{"source_id": conn.data.sources[0].id, "title": conn.data.sources[0].title}] + } diff --git a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py index 46fa5ee23..ef472936b 100644 --- a/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py +++ b/lib/dl_file_uploader_api_lib/dl_file_uploader_api_lib_tests/req_builder.py @@ -13,6 +13,7 @@ ) from dl_connector_bundle_chs3.chs3_gsheets.core.us_connection import GSheetsFileS3Connection +from dl_connector_bundle_chs3.chs3_yadocs.core.us_connection import YaDocsFileS3Connection class ReqBuilder: @@ -167,12 +168,36 @@ def internal_params( @classmethod def update_conn_data( cls, - connection: GSheetsFileS3Connection, + connection: GSheetsFileS3Connection | YaDocsFileS3Connection, save: bool, + file_type: Optional[str] = "gsheets", *, require_ok: bool = True, ) -> Req: sources_desc = [src.get_desc() for src in connection.data.sources] + sources = list() + if file_type == "gsheets": + sources = [ + dict( + id=src_desc.source_id, + title=src_desc.title, + spreadsheet_id=src_desc.spreadsheet_id, + sheet_id=src_desc.sheet_id, + first_line_is_header=src_desc.first_line_is_header, + ) + for src_desc in sources_desc + ] + elif file_type == "yadocs": + sources = [ + dict( + id=src_desc.source_id, + title=src_desc.title, + public_link=src_desc.public_link, + sheet_id=src_desc.sheet_id, + first_line_is_header=src_desc.first_line_is_header, + ) + for src_desc in sources_desc + ] return Req( method="post", url="/api/v2/update_connection_data", @@ -180,16 +205,8 @@ def update_conn_data( "connection_id": connection.uuid, "authorized": False, "save": save, - "sources": [ - dict( - id=src_desc.source_id, - title=src_desc.title, - spreadsheet_id=src_desc.spreadsheet_id, - sheet_id=src_desc.sheet_id, - first_line_is_header=src_desc.first_line_is_header, - ) - for src_desc in sources_desc - ], + "type": file_type, + "sources": sources, }, require_ok=require_ok, ) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py index 7922a09f9..6ca7733e1 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/__init__.py @@ -19,6 +19,8 @@ RenameTenantStatusModel, SourceNotFoundError, SpreadsheetFileSourceSettings, + YaDocsFileSourceSettings, + YaDocsUserSourceDataSourceProperties, YaDocsUserSourceProperties, ) from .storage_schemas import ( @@ -47,7 +49,9 @@ "EmptySourcesError", "RenameTenantStatusModel", "PreviewSet", + "YaDocsFileSourceSettings", "YaDocsUserSourceProperties", + "YaDocsUserSourceDataSourceProperties", ) diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py index 6f2711d45..5ea465b25 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/models.py @@ -81,6 +81,11 @@ class GSheetsFileSourceSettings(SpreadsheetFileSourceSettings): file_type: FileType = attr.ib(default=FileType.gsheets) +@attr.s(init=True, kw_only=True) +class YaDocsFileSourceSettings(SpreadsheetFileSourceSettings): + file_type: FileType = attr.ib(default=FileType.yadocs) + + @attr.s(init=True, kw_only=True) class ExcelFileSourceSettings(SpreadsheetFileSourceSettings): file_type: FileType = attr.ib(default=FileType.xlsx) @@ -127,6 +132,13 @@ class GSheetsUserSourceDataSourceProperties(UserSourceDataSourceProperties): sheet_id: int = attr.ib() +@attr.s(init=True, kw_only=True) +class YaDocsUserSourceDataSourceProperties(UserSourceDataSourceProperties): + file_type: FileType = attr.ib(default=FileType.yadocs) + + sheet_id: str = attr.ib() + + @attr.s(init=True, kw_only=True) class ParsingError: code: str = attr.ib() diff --git a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py index 23f2031ed..247eec44f 100644 --- a/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py +++ b/lib/dl_file_uploader_lib/dl_file_uploader_lib/redis_model/models/storage_schemas.py @@ -35,6 +35,8 @@ GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, RenameTenantStatusModel, + YaDocsFileSourceSettings, + YaDocsUserSourceDataSourceProperties, YaDocsUserSourceProperties, ) @@ -123,6 +125,15 @@ class Meta(BaseSchema.Meta): raw_schema_body = fields.Nested(SchemaColumnStorageSchema, many=True) +class YaDocsFileSourceSettingsSchema(FileSourceSettingsBaseSchema): + class Meta(BaseSchema.Meta): + target = YaDocsFileSourceSettings + + first_line_is_header = fields.Boolean() + raw_schema_header = fields.Nested(SchemaColumnStorageSchema, many=True) + raw_schema_body = fields.Nested(SchemaColumnStorageSchema, many=True) + + class ExcelFileSourceSettingsSchema(FileSourceSettingsBaseSchema): class Meta(BaseSchema.Meta): target = ExcelFileSourceSettings @@ -137,6 +148,7 @@ class FileSourceSettingsSchema(FileTypeOneOfSchema): FileType.csv.name: CSVFileSourceSettingsSchema, FileType.gsheets.name: GSheetsFileSourceSettingsSchema, FileType.xlsx.name: ExcelFileSourceSettingsSchema, + FileType.yadocs.name: YaDocsFileSourceSettingsSchema, } @@ -179,9 +191,17 @@ class Meta(BaseSchema.Meta): sheet_id = fields.Integer() +class YaDocsUserSourceDataSourcePropertiesSchema(UserSourceDataSourcePropertiesBaseSchema): + class Meta(BaseSchema.Meta): + target = YaDocsUserSourceDataSourceProperties + + sheet_id = fields.String() + + class UserSourceDataSourcePropertiesSchema(FileTypeOneOfSchema): type_schemas: dict[str, Type[UserSourceDataSourcePropertiesBaseSchema]] = { FileType.gsheets.name: GSheetsUserSourceDataSourcePropertiesSchema, + FileType.yadocs.name: YaDocsUserSourceDataSourcePropertiesSchema, } diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py index 38ac6e640..7bbd28a4c 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/tasks.py @@ -37,6 +37,7 @@ class DownloadYaDocsTask(BaseTaskMeta): tenant_id: Optional[str] = attr.ib(default=None) connection_id: Optional[str] = attr.ib(default=None) + exec_mode: TaskExecutionMode = attr.ib(default=TaskExecutionMode.BASIC) @attr.s @@ -58,6 +59,9 @@ class ProcessExcelTask(BaseTaskMeta): name = TaskName("process_excel") file_id: str = attr.ib() + exec_mode: Optional[TaskExecutionMode] = attr.ib(default=TaskExecutionMode.BASIC) + tenant_id: Optional[str] = attr.ib(default=None) + connection_id: Optional[str] = attr.ib(default=None) @attr.s diff --git a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/utils_service_registry.py b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/utils_service_registry.py index 60e037b5e..6cd2781a0 100644 --- a/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/utils_service_registry.py +++ b/lib/dl_file_uploader_task_interface/dl_file_uploader_task_interface/utils_service_registry.py @@ -20,6 +20,7 @@ from dl_file_uploader_worker_lib.settings import FileUploaderConnectorsSettings from dl_connector_bundle_chs3.chs3_gsheets.core.constants import CONNECTION_TYPE_GSHEETS_V2 +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_DOCS from dl_connector_bundle_chs3.file.core.constants import CONNECTION_TYPE_FILE from dl_connector_clickhouse.core.clickhouse_base.conn_options import CHConnectOptions @@ -48,6 +49,7 @@ def get_conn_options(conn: ExecutorBasedMixin) -> Optional[ConnectOptions]: connectors_settings = { CONNECTION_TYPE_FILE: connectors_settings.FILE, CONNECTION_TYPE_GSHEETS_V2: connectors_settings.FILE, + CONNECTION_TYPE_DOCS: connectors_settings.FILE, } return DefaultSRFactory( rqe_config=rqe_config_from_env(), diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py index b43c83142..837c0f172 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/cleanup.py @@ -36,6 +36,7 @@ from dl_connector_bundle_chs3.chs3_base.core.us_connection import BaseFileS3Connection from dl_connector_bundle_chs3.chs3_gsheets.core.constants import CONNECTION_TYPE_GSHEETS_V2 +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_DOCS from dl_connector_bundle_chs3.file.core.constants import CONNECTION_TYPE_FILE @@ -231,6 +232,7 @@ async def run(self) -> TaskResult: s3_file_based_conn_types = ( CONNECTION_TYPE_FILE, CONNECTION_TYPE_GSHEETS_V2, + CONNECTION_TYPE_DOCS, ) redis = self._ctx.redis_service.get_redis() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py index bb7f52944..140872857 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/download_yadocs.py @@ -28,6 +28,8 @@ ) from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface +from dl_file_uploader_task_interface.tasks import TaskExecutionMode +from dl_file_uploader_worker_lib.utils.connection_error_tracker import FileConnectionDataSourceErrorTracker from dl_task_processor.task import ( BaseExecutorTask, Fail, @@ -73,7 +75,7 @@ async def run(self) -> TaskResult: redis = self._ctx.redis_service.get_redis() task_processor = self._ctx.make_task_processor(self._request_id) usm = self._ctx.get_async_usm() - + connection_error_tracker = FileConnectionDataSourceErrorTracker(usm, task_processor, redis, self._request_id) try: rmm = RedisModelManager(redis=redis, crypto_keys_config=self._ctx.crypto_keys_config) dfile = await DataFile.get(manager=rmm, obj_id=self.meta.file_id) @@ -117,7 +119,14 @@ async def run(self) -> TaskResult: download_error = FileProcessingError.from_exception(e) dfile.status = FileProcessingStatus.failed dfile.error = download_error + if self.meta.exec_mode != TaskExecutionMode.BASIC: + for src in dfile.sources: + src.status = FileProcessingStatus.failed + src.error = download_error + connection_error_tracker.add_error(src.id, src.error) await dfile.save() + + await connection_error_tracker.finalize(self.meta.exec_mode, self.meta.connection_id) return Success() dfile.filename = spreadsheet_meta["name"] @@ -154,7 +163,14 @@ async def _chunk_iter(chunk_size: int = 10 * 1024 * 1024) -> AsyncGenerator[byte await dfile.save() LOGGER.info(f'Uploaded file "{dfile.filename}".') - await task_processor.schedule(task_interface.ProcessExcelTask(file_id=dfile.id)) + await task_processor.schedule( + task_interface.ProcessExcelTask( + file_id=dfile.id, + tenant_id=self.meta.tenant_id, + connection_id=self.meta.connection_id, + exec_mode=self.meta.exec_mode, + ) + ) LOGGER.info(f"Scheduled ProcessExcelTask for file_id {dfile.id}") except Exception as ex: diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py index d37482e22..fbb6744c8 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/excel.py @@ -1,7 +1,10 @@ import asyncio +from collections import defaultdict +import itertools import logging import ssl from typing import ( + Iterable, Iterator, Optional, ) @@ -15,15 +18,20 @@ from dl_core.raw_data_streaming.stream import SimpleUntypedDataStream from dl_file_uploader_lib import exc from dl_file_uploader_lib.data_sink.json_each_row import S3JsonEachRowUntypedFileDataSink +from dl_file_uploader_lib.enums import FileType from dl_file_uploader_lib.redis_model.base import RedisModelManager from dl_file_uploader_lib.redis_model.models import ( DataFile, DataSource, ExcelFileSourceSettings, FileProcessingError, + YaDocsFileSourceSettings, + YaDocsUserSourceDataSourceProperties, ) from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface +from dl_file_uploader_task_interface.tasks import TaskExecutionMode +from dl_file_uploader_worker_lib.utils.connection_error_tracker import FileConnectionDataSourceErrorTracker from dl_file_uploader_worker_lib.utils.parsing_utils import guess_header_and_schema_excel from dl_task_processor.task import ( BaseExecutorTask, @@ -45,6 +53,12 @@ class ProcessExcelTask(BaseExecutorTask[task_interface.ProcessExcelTask, FileUpl async def run(self) -> TaskResult: dfile: Optional[DataFile] = None + sources_to_update_by_sheet_id: dict[int, list[DataSource]] = defaultdict(list) + usm = self._ctx.get_async_usm() + task_processor = self._ctx.make_task_processor(self._request_id) + redis = self._ctx.redis_service.get_redis() + connection_error_tracker = FileConnectionDataSourceErrorTracker(usm, task_processor, redis, self._request_id) + try: LOGGER.info(f"ProcessExcelTask. File: {self.meta.file_id}") loop = asyncio.get_running_loop() @@ -56,7 +70,9 @@ async def run(self) -> TaskResult: s3 = self._ctx.s3_service - dfile.sources = [] + if self.meta.exec_mode == TaskExecutionMode.BASIC: + dfile.sources = [] + assert dfile.sources is not None s3_resp = await s3.client.get_object( Bucket=s3.tmp_bucket_name, @@ -87,6 +103,9 @@ async def run(self) -> TaskResult: ) as resp: file_data = await resp.json() + for src in dfile.sources: + sources_to_update_by_sheet_id[src.user_source_dsrc_properties.sheet_id].append(src) + for spreadsheet in file_data: sheetname = spreadsheet["sheetname"] sheetdata = spreadsheet["data"] @@ -95,63 +114,109 @@ async def run(self) -> TaskResult: raw_schema: list[SchemaColumn] = [] raw_schema_header: list[SchemaColumn] = [] raw_schema_body: list[SchemaColumn] = [] - source_title = f"{dfile.filename} – {sheetname}" - sheet_data_source = DataSource( - title=source_title, - raw_schema=raw_schema, - status=source_status, - error=None, - ) - dfile.sources.append(sheet_data_source) + + if self.meta.exec_mode == TaskExecutionMode.BASIC: + source_title = f"{dfile.filename} – {sheetname}" + if dfile.file_type == FileType.yadocs: + sheet_data_sources = [ + DataSource( + title=source_title, + raw_schema=raw_schema, + status=source_status, + user_source_dsrc_properties=YaDocsUserSourceDataSourceProperties(sheet_id=sheetname), + error=None, + ) + ] + else: + sheet_data_sources = [ + DataSource( + title=source_title, + raw_schema=raw_schema, + status=source_status, + error=None, + ) + ] + dfile.sources.extend(sheet_data_sources) + else: + if sheetname not in sources_to_update_by_sheet_id: + continue + sheet_data_sources = sources_to_update_by_sheet_id.pop(sheetname) if not sheetdata: - sheet_data_source.error = FileProcessingError.from_exception(exc.EmptyDocument()) - sheet_data_source.status = FileProcessingStatus.failed + for src in sheet_data_sources: + src.error = FileProcessingError.from_exception(exc.EmptyDocument()) + src.status = FileProcessingStatus.failed + connection_error_tracker.add_error(src.id, src.error) else: try: has_header, raw_schema, raw_schema_header, raw_schema_body = guess_header_and_schema_excel( sheetdata ) except Exception as ex: - sheet_data_source.status = FileProcessingStatus.failed - exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.ParseFailed() - sheet_data_source.error = FileProcessingError.from_exception(exc_to_save) + for src in sheet_data_sources: + src.status = FileProcessingStatus.failed + exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.ParseFailed() + src.error = FileProcessingError.from_exception(exc_to_save) + connection_error_tracker.add_error(src.id, src.error) sheet_settings = None - if sheet_data_source.is_applicable: - - def data_iter() -> Iterator[list]: - row_iter = iter(sheetdata) - for row in row_iter: - values = [cell["value"] for cell in row] - yield values - - data_stream = SimpleUntypedDataStream( - data_iter=data_iter(), - rows_to_copy=None, # TODO - ) - with S3JsonEachRowUntypedFileDataSink( - s3=s3.get_sync_client(), - s3_key=sheet_data_source.s3_key, - bucket_name=s3.tmp_bucket_name, - ) as data_sink: - data_sink.dump_data_stream(data_stream) - - assert has_header is not None - sheet_settings = ExcelFileSourceSettings( - first_line_is_header=has_header, - raw_schema_header=raw_schema_header, - raw_schema_body=raw_schema_body, - ) - - sheet_data_source.raw_schema = raw_schema - sheet_data_source.file_source_settings = sheet_settings - + for src in sheet_data_sources: + if src.is_applicable: + if self.meta.exec_mode != TaskExecutionMode.BASIC: + assert src.file_source_settings is not None + has_header = src.file_source_settings.first_line_is_header + assert has_header is not None + + def data_iter() -> Iterator[list]: + row_iter = iter(sheetdata) + for row in row_iter: + values = [cell["value"] for cell in row] + yield values + + data_stream = SimpleUntypedDataStream( + data_iter=data_iter(), + rows_to_copy=None, # TODO + ) + with S3JsonEachRowUntypedFileDataSink( + s3=s3.get_sync_client(), + s3_key=src.s3_key, + bucket_name=s3.tmp_bucket_name, + ) as data_sink: + data_sink.dump_data_stream(data_stream) + + assert has_header is not None + if dfile.file_type == FileType.yadocs: + sheet_settings = YaDocsFileSourceSettings( + first_line_is_header=has_header, + raw_schema_header=raw_schema_header, + raw_schema_body=raw_schema_body, + ) + else: + sheet_settings = ExcelFileSourceSettings( + first_line_is_header=has_header, + raw_schema_header=raw_schema_header, + raw_schema_body=raw_schema_body, + ) + src.raw_schema = raw_schema + src.file_source_settings = sheet_settings + + not_found_sources: Iterable[DataSource] = itertools.chain(*sources_to_update_by_sheet_id.values()) + for src in not_found_sources: + src.error = FileProcessingError.from_exception(exc.DocumentNotFound()) + src.status = FileProcessingStatus.failed + connection_error_tracker.add_error(src.id, src.error) + + await connection_error_tracker.finalize(self.meta.exec_mode, self.meta.connection_id) await dfile.save() LOGGER.info("DataFile object saved.") task_processor = self._ctx.make_task_processor(self._request_id) - parse_file_task = task_interface.ParseFileTask(file_id=dfile.id) + parse_file_task = task_interface.ParseFileTask( + file_id=dfile.id, + tenant_id=self.meta.tenant_id, + connection_id=self.meta.connection_id, + exec_mode=self.meta.exec_mode, + ) await task_processor.schedule(parse_file_task) LOGGER.info(f"Scheduled ParseFileTask for file_id {dfile.id}") @@ -164,5 +229,12 @@ def data_iter() -> Iterator[list]: exc_to_save = ex if isinstance(ex, exc.DLFileUploaderBaseError) else exc.ParseFailed() dfile.error = FileProcessingError.from_exception(exc_to_save) await dfile.save() + + for src in dfile.sources or (): + connection_error_tracker.add_error(src.id, dfile.error) + await connection_error_tracker.finalize(self.meta.exec_mode, self.meta.connection_id) + return Fail() + finally: + await usm.close() return Success() diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py index e27b075fd..0e8fa5d75 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/tasks/save.py @@ -30,6 +30,9 @@ GSheetsUserSourceDataSourceProperties, GSheetsUserSourceProperties, PreviewSet, + YaDocsFileSourceSettings, + YaDocsUserSourceDataSourceProperties, + YaDocsUserSourceProperties, ) from dl_file_uploader_task_interface.context import FileUploaderTaskContext import dl_file_uploader_task_interface.tasks as task_interface @@ -110,6 +113,23 @@ def _get_conn_specific_dsrc_params(dfile: DataFile, src: DataSource) -> dict[str first_line_is_header=file_source_settings.first_line_is_header, ) ) + + if dfile.file_type == FileType.yadocs: + file_source_settings = src.file_source_settings + assert isinstance(file_source_settings, YaDocsFileSourceSettings) + user_source_properties = dfile.user_source_properties + assert isinstance(user_source_properties, YaDocsUserSourceProperties) + user_source_dsrc_properties = src.user_source_dsrc_properties + assert isinstance(user_source_dsrc_properties, YaDocsUserSourceDataSourceProperties) + + kwargs.update( + dict( + public_link=user_source_properties.public_link, + private_path=user_source_properties.private_path, + sheet_id=user_source_dsrc_properties.sheet_id, + first_line_is_header=file_source_settings.first_line_is_header, + ) + ) return kwargs diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py index 4f0b081bd..00f95439e 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib/utils/parsing_utils.py @@ -43,7 +43,7 @@ ) from dl_connector_bundle_chs3.chs3_gsheets.core.constants import CONNECTION_TYPE_GSHEETS_V2 -from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_YADOCS +from dl_connector_bundle_chs3.chs3_yadocs.core.constants import CONNECTION_TYPE_DOCS from dl_connector_bundle_chs3.file.core.constants import CONNECTION_TYPE_FILE @@ -171,7 +171,7 @@ def get_field_id_generator(conn_type: ConnectionType) -> FileUploaderFieldIdGene field_id_gen_cls_map: dict[ConnectionType, Type[FileUploaderFieldIdGenerator]] = { CONNECTION_TYPE_FILE: FileFieldIdGenerator, CONNECTION_TYPE_GSHEETS_V2: GSheetsFieldIdGenerator, - CONNECTION_TYPE_YADOCS: YaDocsFieldIdGenerator, + CONNECTION_TYPE_DOCS: YaDocsFieldIdGenerator, } if conn_type not in field_id_gen_cls_map: @@ -275,6 +275,7 @@ def merge_raw_schemas_spreadsheet( conn_type_map: dict[FileType, ConnectionType] = { FileType.xlsx: CONNECTION_TYPE_FILE, FileType.gsheets: CONNECTION_TYPE_GSHEETS_V2, + FileType.yadocs: CONNECTION_TYPE_DOCS, } col_types_header = raw_schema_to_column_types(header_rs) col_types_body = raw_schema_to_column_types(body_rs) diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/config.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/config.py index 6ba5e3e61..cf39b9dcf 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/config.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/config.py @@ -6,7 +6,7 @@ ) -CONNECTOR_WHITELIST = ["clickhouse", "file", "gsheets_v2"] +CONNECTOR_WHITELIST = ["clickhouse", "file", "gsheets_v2", "docs"] @attr.s(kw_only=True) diff --git a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/conftest.py b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/conftest.py index 739bf974a..2c329938a 100644 --- a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/conftest.py +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/conftest.py @@ -1,5 +1,6 @@ import os +import aiohttp.web import pytest from dl_configs.crypto_keys import get_dummy_crypto_keys_config @@ -7,6 +8,7 @@ GoogleAppSettings, S3Settings, ) +from dl_file_secure_reader_lib.app import create_app as create_reader_app from dl_file_uploader_worker_lib.settings import FileUploaderWorkerSettings from dl_testing.env_params.generic import GenericEnvParamGetter @@ -51,3 +53,12 @@ def file_uploader_worker_settings( SECURE_READER=secure_reader, ) yield settings + + +@pytest.fixture(scope="function") +def reader_app(loop, secure_reader): + current_app = create_reader_app() + runner = aiohttp.web.AppRunner(current_app) + loop.run_until_complete(runner.setup()) + site = aiohttp.web.UnixSite(runner, path=secure_reader.SOCKET) + return loop.run_until_complete(site.start()) diff --git "a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocs.py similarity index 98% rename from "lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" rename to lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocs.py index d6f45bbad..69315c6e6 100644 --- "a/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yado\321\201s.py" +++ b/lib/dl_file_uploader_worker_lib/dl_file_uploader_worker_lib_tests/ext/test_yadocs.py @@ -18,6 +18,7 @@ async def test_download_yadocs_task( s3_client, redis_model_manager, s3_tmp_bucket, + reader_app, ): df = DataFile( filename="",