From 5428e0dfb13cd1887284bd037ddd7c64a338e99d Mon Sep 17 00:00:00 2001 From: Andrey Snytin Date: Fri, 3 Nov 2023 17:55:42 +0100 Subject: [PATCH] Add dl_maintenance package (#68) --- lib/dl_maintenance/LICENSE | 201 ++++++++++ lib/dl_maintenance/README.md | 3 + lib/dl_maintenance/dl_maintenance/__init__.py | 0 .../dl_maintenance/api/__init__.py | 0 .../dl_maintenance/api/common.py | 63 +++ .../dl_maintenance/api/crawler_runner.py | 54 +++ .../dl_maintenance/api/crawlers/__init__.py | 0 .../dl_maintenance/api/data_access.py | 237 +++++++++++ .../dl_maintenance/api/errors.py | 90 +++++ .../dl_maintenance/api/helpers.py | 43 ++ .../dl_maintenance/api/updates.py | 181 +++++++++ .../dl_maintenance/core/__init__.py | 0 .../dl_maintenance/core/common.py | 65 +++ .../dl_maintenance/core/crawlers/__init__.py | 0 .../core/crawlers/config_metric_collector.py | 306 ++++++++++++++ .../core/crawlers/crypto_keys_rotation.py | 58 +++ .../dl_maintenance/core/logging_config.py | 20 + .../dl_maintenance/core/us_crawler_base.py | 373 ++++++++++++++++++ .../dl_maintenance/diff_utils.py | 190 +++++++++ lib/dl_maintenance/dl_maintenance/py.typed | 0 .../dl_maintenance_tests/__init__.py | 0 .../dl_maintenance_tests/unit/__init__.py | 0 .../dl_maintenance_tests/unit/conftest.py | 0 lib/dl_maintenance/pyproject.toml | 48 +++ metapkg/poetry.lock | 44 ++- metapkg/pyproject.toml | 1 + 26 files changed, 1976 insertions(+), 1 deletion(-) create mode 100644 lib/dl_maintenance/LICENSE create mode 100644 lib/dl_maintenance/README.md create mode 100644 lib/dl_maintenance/dl_maintenance/__init__.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/__init__.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/common.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/crawler_runner.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/crawlers/__init__.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/data_access.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/errors.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/helpers.py create mode 100644 lib/dl_maintenance/dl_maintenance/api/updates.py create mode 100644 lib/dl_maintenance/dl_maintenance/core/__init__.py create mode 100644 lib/dl_maintenance/dl_maintenance/core/common.py create mode 100644 lib/dl_maintenance/dl_maintenance/core/crawlers/__init__.py create mode 100644 lib/dl_maintenance/dl_maintenance/core/crawlers/config_metric_collector.py create mode 100644 lib/dl_maintenance/dl_maintenance/core/crawlers/crypto_keys_rotation.py create mode 100644 lib/dl_maintenance/dl_maintenance/core/logging_config.py create mode 100644 lib/dl_maintenance/dl_maintenance/core/us_crawler_base.py create mode 100644 lib/dl_maintenance/dl_maintenance/diff_utils.py create mode 100644 lib/dl_maintenance/dl_maintenance/py.typed create mode 100644 lib/dl_maintenance/dl_maintenance_tests/__init__.py create mode 100644 lib/dl_maintenance/dl_maintenance_tests/unit/__init__.py create mode 100644 lib/dl_maintenance/dl_maintenance_tests/unit/conftest.py create mode 100644 lib/dl_maintenance/pyproject.toml diff --git a/lib/dl_maintenance/LICENSE b/lib/dl_maintenance/LICENSE new file mode 100644 index 000000000..74ba5f6c7 --- /dev/null +++ b/lib/dl_maintenance/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2023 YANDEX LLC + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/lib/dl_maintenance/README.md b/lib/dl_maintenance/README.md new file mode 100644 index 000000000..094ddd09e --- /dev/null +++ b/lib/dl_maintenance/README.md @@ -0,0 +1,3 @@ +# dl_maintenance + +Maintenance tools diff --git a/lib/dl_maintenance/dl_maintenance/__init__.py b/lib/dl_maintenance/dl_maintenance/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance/api/__init__.py b/lib/dl_maintenance/dl_maintenance/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance/api/common.py b/lib/dl_maintenance/dl_maintenance/api/common.py new file mode 100644 index 000000000..d252ece73 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/api/common.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Optional, + Type, +) + +import attr + +from dl_api_lib.app_common import SRFactoryBuilder +from dl_api_lib.app_common_settings import ConnOptionsMutatorsFactory +from dl_api_lib.app_settings import AppSettings +from dl_api_lib.loader import ( + ApiLibraryConfig, + load_api_lib, + preload_api_lib, +) +from dl_configs.settings_loaders.loader_env import ( + load_connectors_settings_from_env_with_fallback, + load_settings_from_env_with_fallback, +) +from dl_core.connectors.settings.registry import ( + CONNECTORS_SETTINGS_CLASSES, + CONNECTORS_SETTINGS_FALLBACKS, +) +from dl_core.loader import CoreLibraryConfig +from dl_maintenance.core.common import MaintenanceEnvironmentManagerBase + + +if TYPE_CHECKING: + from dl_core.services_registry.sr_factories import SRFactory + + +@attr.s(kw_only=True) +class MaintenanceEnvironmentManager(MaintenanceEnvironmentManagerBase): + _app_settings_cls: Type[AppSettings] = attr.ib() + _app_factory_cls: Optional[Type[SRFactoryBuilder]] = attr.ib(default=None) + + def get_app_settings(self) -> AppSettings: + preload_api_lib() + settings = load_settings_from_env_with_fallback(self._app_settings_cls) + load_api_lib( + ApiLibraryConfig( + api_connector_ep_names=settings.BI_API_CONNECTOR_WHITELIST, + core_lib_config=CoreLibraryConfig(core_connector_ep_names=settings.CORE_CONNECTOR_WHITELIST), + ) + ) + return settings + + def get_sr_factory(self, is_async_env: bool) -> Optional[SRFactory]: + assert self._app_factory_cls is not None + conn_opts_factory = ConnOptionsMutatorsFactory() + settings = self.get_app_settings() + sr_factory = self._app_factory_cls(settings=settings).get_sr_factory( # type: ignore + settings=settings, + conn_opts_factory=conn_opts_factory, + connectors_settings=load_connectors_settings_from_env_with_fallback( + settings_registry=CONNECTORS_SETTINGS_CLASSES, + fallbacks=CONNECTORS_SETTINGS_FALLBACKS, + ), + ) + return sr_factory diff --git a/lib/dl_maintenance/dl_maintenance/api/crawler_runner.py b/lib/dl_maintenance/dl_maintenance/api/crawler_runner.py new file mode 100644 index 000000000..5e9cc078c --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/api/crawler_runner.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import logging +from typing import Optional + +from dl_app_tools import log +from dl_core.us_manager.us_manager_async import AsyncUSManager +from dl_maintenance.api.common import MaintenanceEnvironmentManager +from dl_maintenance.core.logging_config import configure_logging_for_shell +from dl_maintenance.core.us_crawler_base import USEntryCrawler + + +LOGGER = logging.getLogger(__name__) + + +async def run_crawler( + m_manager: MaintenanceEnvironmentManager, + crawler: USEntryCrawler, + usm: Optional[AsyncUSManager] = None, + configure_logging: bool = True, + use_sr_factory: bool = False, +) -> None: + """Runner to run crawler in interactive shell""" + red = "\x1b[31m" + nc = "\x1b[0m" + + if configure_logging: + configure_logging_for_shell() + try: + if usm is None: + usm = m_manager.get_async_usm_from_env(use_sr_factory=use_sr_factory) + crawler.set_usm(usm) + req_id = "__".join( + req_id + for req_id in ( + log.context.get_log_context().get("request_id"), + f"cr.{crawler.run_id}", + ) + if req_id is not None + ) + with log.context.log_context(request_id=req_id): + await crawler.run() + finally: + if configure_logging: + print( + "\n".join( + ( + red, + "!!! LOGGING WAS CONFIGURED TO SEND LOGS TO CENTRAL STORAGE !!!", + "!!! NOTICE THAT ALL FURTHER LOGS WILL BE SENT TO CENTRAL STORAGE !!!", + nc, + ) + ) + ) diff --git a/lib/dl_maintenance/dl_maintenance/api/crawlers/__init__.py b/lib/dl_maintenance/dl_maintenance/api/crawlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance/api/data_access.py b/lib/dl_maintenance/dl_maintenance/api/data_access.py new file mode 100644 index 000000000..a19bbd120 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/api/data_access.py @@ -0,0 +1,237 @@ +""" +Usage: + +from dl_api_lib.app_settings import DataApiAppSettings +from dl_data_api.app_factory import StandaloneDataApiAppFactory + +from dl_maintenance.api.common import MaintenanceEnvironmentManager +from dl_maintenance.api.helpers import get_sync_usm, get_dataset +from dl_maintenance.api.data_access import get_result_data, make_query_spec_union + +mm = MaintenanceEnvironmentManager(app_settings_cls=DataApiAppSettings, app_factory_cls=StandaloneDataApiAppFactory) +ds = get_dataset(mm, 'hfu4hg98wh48', is_async_env=True) +raw_query_spec_union = make_query_spec_union(select=['My Field', 'fd83956d-8da7-4908-b376-ffc11576583a']) +us_manager = get_sync_usm() +response_json = await get_result_data(dataset=ds, us_manager=us_manager, raw_query_spec=raw_query_spec) +print_result_data(response_json) + +""" + +from typing import ( + Iterable, + Optional, + Union, +) + +import tabulate + +from dl_api_commons.base_models import RequestContextInfo +from dl_api_commons.reporting.registry import ( + DefaultReportingRegistry, + ReportingRegistry, +) +from dl_api_lib.api_common.data_serialization import ( + DataRequestResponseSerializer, + PivotDataRequestResponseSerializer, +) +from dl_api_lib.dataset.view import DatasetView +from dl_api_lib.pivot.pandas.transformer import PdPivotTransformer +from dl_api_lib.query.formalization.block_formalizer import BlockFormalizer +from dl_api_lib.query.formalization.legend_formalizer import ResultLegendFormalizer +from dl_api_lib.query.formalization.pivot_formalizer import PivotFormalizer +from dl_api_lib.query.formalization.raw_pivot_specs import ( + RawDimensionPivotRoleSpec, + RawPivotLegendItem, + RawPivotMeasureRoleSpec, + RawPivotSpec, +) +from dl_api_lib.query.formalization.raw_specs import ( + IdOrTitleFieldRef, + RawQuerySpecUnion, + RawSelectFieldSpec, +) +from dl_api_lib.request_model.normalization.drm_normalizer_pivot import PivotSpecNormalizer +from dl_constants.enums import PivotRole +from dl_core.us_dataset import Dataset +from dl_core.us_manager.us_manager import USManagerBase +from dl_query_processing.legend.block_legend import BlockSpec +from dl_query_processing.merging.merger import DataStreamMerger +from dl_query_processing.merging.primitives import MergedQueryDataStream +from dl_query_processing.postprocessing.postprocessor import DataPostprocessor +from dl_query_processing.postprocessing.primitives import ( + PostprocessedQueryBlock, + PostprocessedQueryUnion, +) + + +def get_ds_view( + dataset: Dataset, + us_manager: USManagerBase, + block_spec: BlockSpec, + rci: Optional[RequestContextInfo] = None, +) -> DatasetView: + ds_view = DatasetView( + ds=dataset, + us_manager=us_manager, + block_spec=block_spec, + rci=rci or RequestContextInfo.create_empty(), + ) + return ds_view + + +def make_query_spec_union( + select: Union[list[str], dict[str, int]], + disable_rls: bool = True, +) -> RawQuerySpecUnion: + if not isinstance(select, dict): + select = {item: idx for idx, item in enumerate(select)} + assert isinstance(select, dict) + return RawQuerySpecUnion( + select_specs=[ + RawSelectFieldSpec( + ref=IdOrTitleFieldRef(id_or_title=item), + legend_item_id=liid, + ) + for item, liid in select.items() + ], + disable_rls=disable_rls, + ) + + +def make_pivot_spec( + row_dimensions: Iterable[str], + column_dimensions: Iterable[str], + measures: Iterable[str], + normalize: bool = True, +) -> tuple[RawQuerySpecUnion, RawPivotSpec]: + row_dimensions = list(row_dimensions) + column_dimensions = list(column_dimensions) + measures = list(measures) + + select = {item: idx for idx, item in enumerate(row_dimensions + column_dimensions + measures)} + raw_query_spec_union = make_query_spec_union(select=select) + pivot_items = [ + *( + RawPivotLegendItem( + legend_item_ids=[select[item]], + role_spec=RawDimensionPivotRoleSpec(role=PivotRole.pivot_row), + ) + for item in row_dimensions + ), + *( + RawPivotLegendItem( + legend_item_ids=[select[item]], + role_spec=RawDimensionPivotRoleSpec(role=PivotRole.pivot_column), + ) + for item in column_dimensions + ), + *( + RawPivotLegendItem( + legend_item_ids=[select[item]], + role_spec=RawPivotMeasureRoleSpec(role=PivotRole.pivot_measure), + ) + for item in measures + ), + ] + raw_pivot_spec = RawPivotSpec(structure=pivot_items) + + if normalize: + normalizer = PivotSpecNormalizer() + raw_pivot_spec, raw_query_spec_union = normalizer.normalize_spec( + spec=raw_pivot_spec, raw_query_spec_union=raw_query_spec_union + ) + + return raw_query_spec_union, raw_pivot_spec + + +async def get_merged_data_stream( + dataset: Dataset, + us_manager: USManagerBase, + raw_query_spec_union: RawQuerySpecUnion, + allow_query_cache_usage: bool = True, + reporting_registry: Optional[ReportingRegistry] = None, +) -> MergedQueryDataStream: + profiler_prefix = "maintenance-result" + + legend_formalizer = ResultLegendFormalizer(dataset=dataset) + legend = legend_formalizer.make_legend(raw_query_spec_union=raw_query_spec_union) + block_formalizer = BlockFormalizer(dataset=dataset, reporting_registry=reporting_registry) + block_legend = block_formalizer.make_block_legend( + raw_query_spec_union=raw_query_spec_union, + legend=legend, + ) + rci = reporting_registry.rci if reporting_registry is not None else RequestContextInfo.create_empty() + ds_view = get_ds_view(dataset=dataset, us_manager=us_manager, block_spec=block_legend.blocks[0], rci=rci) + exec_info = ds_view.build_exec_info() + executed_query = await ds_view.get_data_async( + exec_info=exec_info, + allow_cache_usage=allow_query_cache_usage, + ) + postprocessor = DataPostprocessor(profiler_prefix=profiler_prefix) + postprocessed_query = postprocessor.get_postprocessed_data( + executed_query=executed_query, + block_spec=block_legend.blocks[0], + ) + postprocessed_query_union = PostprocessedQueryUnion( + blocks=[ + PostprocessedQueryBlock.from_block_spec( + block_spec=block_legend.blocks[0], + postprocessed_query=postprocessed_query, + ) + ], + legend=legend, + limit=raw_query_spec_union.limit, + offset=raw_query_spec_union.offset, + ) + merged_stream = DataStreamMerger().merge(postprocessed_query_union=postprocessed_query_union) + return merged_stream + + +async def get_result_data( + dataset: Dataset, + us_manager: USManagerBase, + raw_query_spec_union: RawQuerySpecUnion, + allow_query_cache_usage: bool = True, +) -> dict: + merged_stream = await get_merged_data_stream( + dataset=dataset, + us_manager=us_manager, + raw_query_spec_union=raw_query_spec_union, + allow_query_cache_usage=allow_query_cache_usage, + ) + response_json = DataRequestResponseSerializer.make_data_response_v1(merged_stream=merged_stream) + return response_json + + +async def get_pivot_data( + dataset: Dataset, + us_manager: USManagerBase, + raw_query_spec_union: RawQuerySpecUnion, + raw_pivot_spec: RawPivotSpec, + allow_query_cache_usage: bool = True, +) -> dict: + reporting_registry = DefaultReportingRegistry(rci=RequestContextInfo.create_empty()) + merged_stream = await get_merged_data_stream( + dataset=dataset, + us_manager=us_manager, + raw_query_spec_union=raw_query_spec_union, + allow_query_cache_usage=allow_query_cache_usage, + reporting_registry=reporting_registry, + ) + pivot_formalizer = PivotFormalizer(dataset=dataset, legend=merged_stream.legend) + pivot_legend = pivot_formalizer.make_pivot_legend(raw_pivot_spec=raw_pivot_spec) + pivot_transformer = PdPivotTransformer(legend=merged_stream.legend, pivot_legend=pivot_legend) + pivot_table = pivot_transformer.pivot(rows=merged_stream.rows) + response_json = PivotDataRequestResponseSerializer.make_pivot_response( + merged_stream=merged_stream, + pivot_table=pivot_table, + reporting_registry=reporting_registry, + ) + return response_json + + +def print_result_data(response_json: dict) -> None: + result_data = response_json["result"]["data"] + headers = [item[0] for item in result_data["Type"][1][1]] + data = result_data["Data"] + print(tabulate.tabulate(data, headers=headers, tablefmt="pipe")) diff --git a/lib/dl_maintenance/dl_maintenance/api/errors.py b/lib/dl_maintenance/dl_maintenance/api/errors.py new file mode 100644 index 000000000..402d942c8 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/api/errors.py @@ -0,0 +1,90 @@ +""" +Sample usage (remove phantom errors from dataset): + +from dl_api_lib.app_settings import DataApiAppSettings +from dl_data_api.app_factory import StandaloneDataApiAppFactory + +from dl_maintenance.api.common import MaintenanceEnvironmentManager +from dl_maintenance.api.helpers import get_dataset +from dl_maintenance.api.errors import ComponentErrorManager + +mm = MaintenanceEnvironmentManager(app_settings_cls=DataApiAppSettings, app_factory_cls=StandaloneDataApiAppFactory) +us_manager = mm.get_usm_from_env() +ds = get_dataset(mm, 'hfu4hg98wh48', is_async_env=False) +error_mgr = ComponentErrorManager(dataset=ds, us_manager=us_manager) +phantom_refs = error_mgr.find_phantom_error_refs() +error_mgr.print_component_refs(phantom_refs) +# error_mgr.remove_errors_for_refs(phantom_refs) +# us_manager.save(ds) + +""" + +from collections import defaultdict +from typing import ( + Any, + Dict, + List, + Optional, +) +import uuid + +import attr + +from dl_api_lib.dataset.component_abstraction import ( + DatasetComponentAbstraction, + DatasetComponentRef, +) +from dl_constants.enums import ComponentType +from dl_core.us_dataset import Dataset +from dl_core.us_manager.us_manager import USManagerBase + + +TACTION = Dict[str, Any] + + +def _make_component_id() -> str: + return str(uuid.uuid4()) + + +@attr.s +class ComponentErrorManager: + _dataset: Optional[Dataset] = attr.ib(kw_only=True, default=None) + _dca: DatasetComponentAbstraction = attr.ib(init=False) + _us_manager: USManagerBase = attr.ib(kw_only=True) + + def __attrs_post_init__(self) -> None: + self._dca = DatasetComponentAbstraction( + dataset=self.dataset, us_entry_buffer=self._us_manager.get_entry_buffer() + ) + + @property + def dataset(self) -> Dataset: + assert self._dataset is not None + return self._dataset + + def find_phantom_error_refs(self) -> List[DatasetComponentRef]: + all_error_refs = [ + DatasetComponentRef(component_type=item.type, component_id=item.id) + for item in self.dataset.error_registry.items + ] + phantom_refs: List[DatasetComponentRef] = [] + for component_ref in all_error_refs: + if self._dca.get_component(component_ref=component_ref) is None: + phantom_refs.append(component_ref) + + return phantom_refs + + @staticmethod + def print_component_refs(component_refs: List[DatasetComponentRef]) -> None: + by_comp_type: Dict[ComponentType, List[str]] = defaultdict(list) + for component_ref in component_refs: + by_comp_type[component_ref.component_type].append(component_ref.component_id) + + for component_type, component_id_list in sorted(by_comp_type.items(), key=lambda el: el[0].name): + print(f"{component_type.name}:") + for component_id in sorted(component_id_list): + print(f" {component_id}") + + def remove_errors_for_refs(self, component_refs: List[DatasetComponentRef]) -> None: + for component_ref in component_refs: + self.dataset.error_registry.remove_errors(id=component_ref.component_id) diff --git a/lib/dl_maintenance/dl_maintenance/api/helpers.py b/lib/dl_maintenance/dl_maintenance/api/helpers.py new file mode 100644 index 000000000..caf92f360 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/api/helpers.py @@ -0,0 +1,43 @@ +""" +Common cases: + +from dl_api_lib.app_settings import DataApiAppSettings +from dl_data_api.app_factory import StandaloneDataApiAppFactory + +from dl_maintenance.api.common import MaintenanceEnvironmentManager +from dl_maintenance.api.helpers import get_migration_entry, dump_entry_data + +mm = MaintenanceEnvironmentManager(app_settings_cls=DataApiAppSettings, app_factory_cls=StandaloneDataApiAppFactory) +entry = get_migration_entry(mm, 'fweavr4awr332c') +dump_entry_data(entry) + +""" + + +import json + +from dl_core.us_dataset import Dataset +from dl_core.us_entry import ( + USEntry, + USMigrationEntry, +) +from dl_maintenance.api.common import MaintenanceEnvironmentManager + + +def get_migration_entry(m_manager: MaintenanceEnvironmentManager, entry_id: str) -> USMigrationEntry: + usm = m_manager.get_usm_from_env(use_sr_factory=False, is_async_env=False) + return usm.get_by_id(entry_id=entry_id, expected_type=USMigrationEntry) + + +def get_entry(m_manager: MaintenanceEnvironmentManager, entry_id: str, is_async_env: bool = True) -> USEntry: + usm = m_manager.get_usm_from_env(is_async_env=is_async_env) + return usm.get_by_id(entry_id=entry_id, expected_type=USMigrationEntry) + + +def get_dataset(m_manager: MaintenanceEnvironmentManager, entry_id: str, is_async_env: bool = True) -> Dataset: + usm = m_manager.get_usm_from_env(is_async_env=is_async_env) + return usm.get_by_id(entry_id=entry_id, expected_type=Dataset) + + +def dump_entry_data(entry: USMigrationEntry) -> str: + return json.dumps(entry.data) diff --git a/lib/dl_maintenance/dl_maintenance/api/updates.py b/lib/dl_maintenance/dl_maintenance/api/updates.py new file mode 100644 index 000000000..12c714855 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/api/updates.py @@ -0,0 +1,181 @@ +""" +Sample usage (replace connection and switch data source to subselect):: + +from dl_api_lib.app_settings import DataApiAppSettings +from dl_data_api.app_factory import StandaloneDataApiAppFactory + +from dl_maintenance.api.common import MaintenanceEnvironmentManager +from dl_maintenance.api.helpers import get_dataset +from dl_maintenance.api.updates import SimpleDatasetUpdateGen, update_dataset +from dl_connector_chyt.core.constants import SOURCE_TYPE_CHYT_YTSAURUS_SUBSELECT + +mm = MaintenanceEnvironmentManager(app_settings_cls=DataApiAppSettings, app_factory_cls=StandaloneDataApiAppFactory) +subsql = "SELECT * FROM my_table" +ds = get_dataset(mm, "hfu4hg98wh48", is_async_env=False) +us_manager = mm.get_usm_from_env() +update_gen = SimpleDatasetUpdateGen(dataset=ds, us_manager=us_manager) +update_dataset( + dataset=ds, + us_manager=us_manager, + updates=[ + update_gen.replace_connection(old_id="abzgni02ra8a7", new_id="lg6crpowhm3ij"), + update_gen.update_source_as_subselect( + id="bb46b7c1-9d5f-11eb-841a-43a71976c9af", + source_type=SOURCE_TYPE_CHYT_YTSAURUS_SUBSELECT, subsql=subsql, + ), + ], +) + +""" + +from typing import ( + List, + Optional, + Sequence, +) +import uuid + +import attr + +from dl_api_lib.dataset.validator import DatasetValidator +from dl_api_lib.enums import DatasetAction +from dl_api_lib.request_model.data import ( + Action, + AddField, + AddUpdateSourceAction, + FieldAction, + ReplaceConnection, + ReplaceConnectionAction, + SourceActionBase, +) +from dl_constants.enums import ( + AggregationFunction, + CalcMode, + DataSourceRole, + DataSourceType, + UserDataType, +) +from dl_core.base_models import DefaultConnectionRef +from dl_core.components.accessor import DatasetComponentAccessor +from dl_core.data_source.collection import DataSourceCollectionFactory +from dl_core.fields import ParameterValueConstraint +from dl_core.us_dataset import Dataset +from dl_core.us_manager.us_manager import USManagerBase +from dl_core.values import BIValue + + +def _make_component_id() -> str: + return str(uuid.uuid4()) + + +@attr.s +class SimpleDatasetUpdateGen: + _dataset: Dataset = attr.ib(kw_only=True) + _us_manager: USManagerBase = attr.ib(kw_only=True) + _ds_accessor: DatasetComponentAccessor = attr.ib(init=False) + + @_ds_accessor.default + def _make_ds_accessor(self) -> DatasetComponentAccessor: + return DatasetComponentAccessor(dataset=self._dataset) + + @property + def dataset(self) -> Dataset: + return self._dataset + + def add_field( + self, + title: str, + formula: Optional[str] = None, + source: Optional[str] = None, + cast: Optional[UserDataType] = None, + aggregation: AggregationFunction = AggregationFunction.none, + default_value: Optional[BIValue] = None, + value_constraint: Optional[ParameterValueConstraint] = None, + ) -> FieldAction: + if default_value is not None: + calc_mode = CalcMode.parameter + elif formula: + calc_mode = CalcMode.formula + else: + calc_mode = CalcMode.direct + + field_id = _make_component_id() + title = title or field_id + + field_data = { + "guid": field_id, + "title": title, + "source": source or "", + "formula": formula or "", + "calc_mode": calc_mode, + "aggregation": aggregation, + "default_value": default_value, + "value_constraint": value_constraint, + } + if cast is not None: + field_data["cast"] = cast + + action_data = { + "action": DatasetAction.add_field, + "field": AddField(**field_data), # type: ignore + } + return FieldAction(**action_data) # type: ignore + + def update_source_as_subselect( + self, + id: str, + source_type: DataSourceType, + subsql: str, + ) -> AddUpdateSourceAction: + assert source_type.name.endswith("_SUBSELECT"), "Must be a *_SUBSELECT source type" + + dsrc_coll_spec = self._ds_accessor.get_data_source_coll_spec_strict(source_id=id) + dsrc_coll_factory = DataSourceCollectionFactory(us_entry_buffer=self._us_manager.get_entry_buffer()) + dsrc_coll = dsrc_coll_factory.get_data_source_collection(spec=dsrc_coll_spec) + + dsrc = dsrc_coll.get_strict(role=DataSourceRole.origin) + assert dsrc is not None + conn_ref = dsrc.spec.connection_ref + assert isinstance(conn_ref, DefaultConnectionRef), "Origin connection must be a regular connection" + + source_data = { + "id": id, + "source_type": source_type, + "connection_id": conn_ref.conn_id, + "parameters": { + "subsql": subsql, + }, + } + + action_data = { + "action": DatasetAction.update_source, + "source": source_data, + } + return AddUpdateSourceAction(**action_data) # type: ignore + + def replace_connection(self, old_id: str, new_id: str) -> ReplaceConnectionAction: + connection_data = {"id": old_id, "new_id": new_id} + action_data = { + "action": DatasetAction.replace_connection, + "connection": ReplaceConnection(**connection_data), + } + return ReplaceConnectionAction(**action_data) # type: ignore + + def refresh_source(self, source_id: str) -> SourceActionBase: + action_data = { + "action": DatasetAction.refresh_source, + "source": {"id": source_id}, + } + return SourceActionBase(**action_data) # type: ignore + + def refresh_all_sources(self) -> List[SourceActionBase]: + return [self.refresh_source(source_id) for source_id in self._ds_accessor.get_data_source_id_list()] + + +def update_dataset(dataset: Dataset, updates: Sequence[Action], us_manager: USManagerBase) -> None: + """ + Apply updates to the dataset. + The dataset must be fetched in sync mode (get_dataset('...', is_async_env=False)) + """ + ds_validator = DatasetValidator(ds=dataset, us_manager=us_manager) + ds_validator.apply_batch(action_batch=updates) diff --git a/lib/dl_maintenance/dl_maintenance/core/__init__.py b/lib/dl_maintenance/dl_maintenance/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance/core/common.py b/lib/dl_maintenance/dl_maintenance/core/common.py new file mode 100644 index 000000000..5b130d7b2 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/core/common.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Any, + Optional, +) + +import attr + +from dl_api_commons.base_models import RequestContextInfo +from dl_core.united_storage_client import USAuthContextMaster +from dl_core.us_manager.us_manager_async import AsyncUSManager +from dl_core.us_manager.us_manager_sync import SyncUSManager + + +if TYPE_CHECKING: + from dl_core.services_registry.sr_factories import SRFactory + + +@attr.s +class UsConfig: + base_url: str = attr.ib(kw_only=True) + master_token: str = attr.ib(kw_only=True) + + +class MaintenanceEnvironmentManagerBase: + def get_app_settings(self) -> Any: + raise NotImplementedError + + def get_us_config(self) -> UsConfig: + app_settings = self.get_app_settings() + return UsConfig( + base_url=app_settings.US_BASE_URL, + master_token=app_settings.US_MASTER_TOKEN, + ) + + def get_sr_factory(self, is_async_env: bool) -> Optional[SRFactory]: + return None + + def get_usm_from_env(self, use_sr_factory: bool = True, is_async_env: bool = True) -> SyncUSManager: + us_config = self.get_us_config() + rci = RequestContextInfo.create_empty() + sr_factory = self.get_sr_factory(is_async_env=is_async_env) if use_sr_factory else None + service_registry = sr_factory.make_service_registry(rci) if sr_factory is not None else None + + return SyncUSManager( + us_base_url=us_config.base_url, + us_auth_context=USAuthContextMaster(us_master_token=us_config.master_token), + bi_context=rci, + services_registry=service_registry, + ) + + def get_async_usm_from_env(self, use_sr_factory: bool = True) -> AsyncUSManager: + us_config = self.get_us_config() + rci = RequestContextInfo.create_empty() + sr_factory = self.get_sr_factory(is_async_env=True) if use_sr_factory else None + service_registry = sr_factory.make_service_registry(rci) if sr_factory is not None else None + + return AsyncUSManager( + us_base_url=us_config.base_url, + us_auth_context=USAuthContextMaster(us_master_token=us_config.master_token), + bi_context=rci, + services_registry=service_registry, + ) diff --git a/lib/dl_maintenance/dl_maintenance/core/crawlers/__init__.py b/lib/dl_maintenance/dl_maintenance/core/crawlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance/core/crawlers/config_metric_collector.py b/lib/dl_maintenance/dl_maintenance/core/crawlers/config_metric_collector.py new file mode 100644 index 000000000..568f08144 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/core/crawlers/config_metric_collector.py @@ -0,0 +1,306 @@ +""" +See: dl_core/maintenance/README.md ++ +crawler.print_stats() # chars=False) + +""" + +from __future__ import annotations + +from collections import defaultdict +from enum import ( + Enum, + auto, + unique, +) +import json +import statistics +from typing import ( + Any, + AsyncIterable, + Generic, + Optional, + Sequence, + TypeVar, + Union, +) + +import attr + +from dl_core.us_entry import ( + USEntry, + USMigrationEntry, +) +from dl_core.us_manager.us_manager_async import AsyncUSManager +from dl_maintenance.core.us_crawler_base import USEntryCrawler + + +DEFAULT_QUANTILE_SETTINGS = (1000, [100, 750, 900, 950, 999]) + + +@unique +class Metric(Enum): + # Field + ds_cnt_field_all = auto() + ds_cnt_field_direct = auto() + ds_cnt_field_formula = auto() + ds_cnt_field_parameter = auto() + ds_len_field_title = auto() + ds_len_field_formula = auto() + ds_len_field_formula_line = auto() + ds_cnt_field_formula_lines = auto() + ds_len_field_source = auto() + ds_len_field_guid = auto() + ds_chars_field_guid = auto() + ds_chars_field_title = auto() + ds_chars_field_formula = auto() + ds_chars_field_source = auto() + # Source + ds_cnt_source = auto() + ds_cnt_source_columns = auto() + ds_len_source_id = auto() + ds_len_source_db_name = auto() + ds_chars_source_db_name = auto() + ds_len_source_schema_name = auto() + ds_chars_source_schema_name = auto() + ds_len_source_table_name = auto() + ds_chars_source_table_name = auto() + ds_len_source_table_names = auto() + ds_chars_source_table_names = auto() + ds_cnt_source_table_names_lines = auto() + ds_len_source_table_names_line = auto() + ds_len_source_subsql = auto() + ds_chars_source_subsql = auto() + ds_len_source_subsql_line = auto() + ds_cnt_source_subsql_lines = auto() + ds_len_source_column_name = auto() + ds_len_source_column_title = auto() + ds_chars_source_column_name = auto() + ds_chars_source_column_title = auto() + # Avatar + ds_cnt_avatar = auto() + ds_len_avatar_id = auto() + ds_len_avatar_title = auto() + ds_chars_avatar_title = auto() + # Relation + ds_cnt_relation = auto() + ds_cnt_relation_condition = auto() + ds_len_relation_id = auto() + # Obligatory filter + ds_cnt_ob_filter = auto() + ds_cnt_ob_filter_conditions = auto() + ds_cnt_ob_filter_args = auto() + ds_len_ob_filter_id = auto() + ds_len_ob_filter_arg = auto() + + +M = Metric # for convenience + +_RAW_VALUE_TV = TypeVar("_RAW_VALUE_TV") + + +class RawValueCollector(Generic[_RAW_VALUE_TV]): + def add_value(self, value: _RAW_VALUE_TV) -> None: + raise NotImplementedError + + def get_aggregate(self, **kwargs: Any) -> dict[str, Any]: + raise NotImplementedError + + +@attr.s +class RawIntegerCollector(RawValueCollector[int]): + _values: list[int] = attr.ib(init=False, factory=list) + + def add_value(self, value: int) -> None: + self._values.append(value) + + def _get_quantiles( + self, + parts: int, + return_values: Sequence[int], + ) -> dict[str, Union[int, float]]: + results = statistics.quantiles(self._values, n=parts, method="inclusive") + return {f"q{parts}_{ret_idx}": results[ret_idx - 1] for ret_idx in return_values} + + def get_aggregate( + self, + quantile_settings: tuple[int, Sequence[int]] = DEFAULT_QUANTILE_SETTINGS, + **kwargs: Any, + ) -> dict[str, Any]: + return dict( + min=min(self._values), + max=max(self._values), + mean=statistics.mean(self._values), + median=statistics.median(self._values), + **self._get_quantiles( + parts=quantile_settings[0], + return_values=quantile_settings[1], + ), + ) + + +@attr.s +class RawCharCollector(RawValueCollector[str]): + _values: set[str] = attr.ib(init=False, factory=set) + + def add_value(self, value: str) -> None: + self._values |= set(value) + + def get_aggregate(self, **kwargs: Any) -> dict[str, Any]: + return dict(chars="".join(sorted(self._values))) + + +@attr.s +class StatisticsCollector: + _int_collectors: dict[Metric, RawIntegerCollector] = attr.ib( + init=False, factory=lambda: defaultdict(RawIntegerCollector) + ) + _char_collectors: dict[Metric, RawCharCollector] = attr.ib( + init=False, factory=lambda: defaultdict(RawCharCollector) + ) + + def int_collector(self, metric: Metric) -> RawIntegerCollector: + return self._int_collectors[metric] + + def char_collector(self, metric: Metric) -> RawCharCollector: + return self._char_collectors[metric] + + def get_jsonable_dict( + self, + quantile_settings: tuple[int, Sequence[int]], + chars: bool = True, + ) -> dict[str, dict[str, Any]]: + result = { + key.name: coll.get_aggregate(quantile_settings=quantile_settings) + for key, coll in self._int_collectors.items() + } + if chars: + result.update({key.name: coll.get_aggregate() for key, coll in self._char_collectors.items()}) + + return result + + +@attr.s +class DatasetMetricCollector(USEntryCrawler): + ENTRY_TYPE = USMigrationEntry + + _stats_collector: StatisticsCollector = attr.ib(init=False, factory=StatisticsCollector) + _quantile_settings: tuple[int, Sequence[int]] = attr.ib(kw_only=True, default=DEFAULT_QUANTILE_SETTINGS) + + def get_raw_entry_iterator(self, crawl_all_tenants: bool = True) -> AsyncIterable[dict[str, Any]]: + return self.usm.get_raw_collection(entry_scope="dataset", all_tenants=crawl_all_tenants) + + async def process_entry_get_save_flag( + self, entry: USEntry, logging_extra: dict[str, Any], usm: Optional[AsyncUSManager] = None + ) -> tuple[bool, str]: + data = entry.data + self._stats_collector.int_collector(M.ds_cnt_field_all).add_value(len(data["result_schema"])) + self._stats_collector.int_collector(M.ds_cnt_field_formula).add_value( + len([field for field in data["result_schema"] if field["calc_mode"] == "formula"]) + ) + self._stats_collector.int_collector(M.ds_cnt_field_direct).add_value( + len([field for field in data["result_schema"] if field["calc_mode"] == "direct"]) + ) + self._stats_collector.int_collector(M.ds_cnt_field_parameter).add_value( + len([field for field in data["result_schema"] if field["calc_mode"] == "parameter"]) + ) + for field in data["result_schema"]: + self._stats_collector.int_collector(M.ds_len_field_guid).add_value(len(field["guid"])) + self._stats_collector.int_collector(M.ds_len_field_title).add_value(len(field["title"])) + self._stats_collector.char_collector(M.ds_chars_field_guid).add_value(field["guid"]) + self._stats_collector.char_collector(M.ds_chars_field_title).add_value(field["title"]) + if field["calc_mode"] == "formula": + formula = field["formula"] + self._stats_collector.int_collector(M.ds_len_field_formula).add_value(len(formula)) + self._stats_collector.char_collector(M.ds_chars_field_formula).add_value(formula) + formula_lines = formula.split("\n") + self._stats_collector.int_collector(M.ds_cnt_field_formula_lines).add_value(len(formula_lines)) + for f_line in formula_lines: + self._stats_collector.int_collector(M.ds_len_field_formula_line).add_value(len(f_line)) + elif field["calc_mode"] == "direct": + self._stats_collector.int_collector(M.ds_len_field_source).add_value(len(field["source"])) + self._stats_collector.char_collector(M.ds_chars_field_source).add_value(field["source"]) + + self._stats_collector.int_collector(M.ds_cnt_source).add_value(len(data.get("source_collections", []))) + for dsrc_coll in data.get("source_collections", []): + self._stats_collector.int_collector(M.ds_len_source_id).add_value(len(dsrc_coll["id"])) + origin = dsrc_coll.get("origin") + if origin is not None: + origin = dict(origin, **origin.get("parameters", {})) + if origin.get("db_name"): + self._stats_collector.int_collector(M.ds_len_source_db_name).add_value(len(origin["db_name"])) + self._stats_collector.char_collector(M.ds_chars_source_db_name).add_value(origin["db_name"]) + if origin.get("schema_name"): + self._stats_collector.int_collector(M.ds_len_source_schema_name).add_value( + len(origin["schema_name"]) + ) + self._stats_collector.char_collector(M.ds_chars_source_schema_name).add_value(origin["schema_name"]) + if origin.get("table_name"): + self._stats_collector.int_collector(M.ds_len_source_table_name).add_value(len(origin["table_name"])) + self._stats_collector.char_collector(M.ds_chars_source_table_name).add_value(origin["table_name"]) + if origin.get("table_names"): + table_names = origin["table_names"] + self._stats_collector.int_collector(M.ds_len_source_table_names).add_value(len(table_names)) + self._stats_collector.char_collector(M.ds_chars_source_table_names).add_value(table_names) + table_names_lines = table_names.split("\n") + self._stats_collector.int_collector(M.ds_cnt_source_table_names_lines).add_value( + len(table_names_lines) + ) + for table_names_line in table_names_lines: + self._stats_collector.int_collector(M.ds_len_source_table_names_line).add_value( + len(table_names_line) + ) + if origin.get("subsql"): + subsql = origin["subsql"] + self._stats_collector.int_collector(M.ds_len_source_subsql).add_value(len(subsql)) + self._stats_collector.char_collector(M.ds_chars_source_subsql).add_value(subsql) + subsql_lines = subsql.split("\n") + self._stats_collector.int_collector(M.ds_cnt_source_subsql_lines).add_value(len(subsql_lines)) + for sub_line in subsql_lines: + self._stats_collector.int_collector(M.ds_len_source_subsql_line).add_value(len(sub_line)) + if origin.get("raw_schema"): + self._stats_collector.int_collector(M.ds_cnt_source_columns).add_value(len(origin["raw_schema"])) + for column in origin["raw_schema"]: + self._stats_collector.int_collector(M.ds_len_source_column_name).add_value(len(column["name"])) + self._stats_collector.int_collector(M.ds_len_source_column_title).add_value( + len(column["title"]) + ) + self._stats_collector.char_collector(M.ds_chars_source_column_name).add_value(column["name"]) + title = column.get("title") + if title is not None: + self._stats_collector.char_collector(M.ds_chars_source_column_title).add_value(title) + + self._stats_collector.int_collector(M.ds_cnt_avatar).add_value(len(data.get("source_avatars", []))) + for avatar in data.get("source_avatars", []): + self._stats_collector.int_collector(M.ds_len_avatar_id).add_value(len(avatar["id"])) + if avatar["title"]: + self._stats_collector.int_collector(M.ds_len_avatar_title).add_value(len(avatar["title"])) + self._stats_collector.char_collector(M.ds_chars_avatar_title).add_value(avatar["title"]) + + self._stats_collector.int_collector(M.ds_cnt_relation).add_value(len(data.get("avatar_relations", []))) + for rel in data.get("avatar_relations", []): + self._stats_collector.int_collector(M.ds_len_relation_id).add_value(len(rel["id"])) + self._stats_collector.int_collector(M.ds_cnt_relation_condition).add_value(len(rel["conditions"])) + + self._stats_collector.int_collector(M.ds_cnt_ob_filter).add_value(len(data.get("obligatory_filters", []))) + for obf in data.get("obligatory_filters", []): + self._stats_collector.int_collector(M.ds_len_ob_filter_id).add_value(len(obf["id"])) + self._stats_collector.int_collector(M.ds_cnt_ob_filter_conditions).add_value(len(obf["default_filters"])) + for obf_cond in obf["default_filters"]: + self._stats_collector.int_collector(M.ds_cnt_ob_filter_args).add_value(len(obf_cond["values"])) + for obf_cond_value in obf_cond["values"]: + self._stats_collector.int_collector(M.ds_len_ob_filter_arg).add_value(len(obf_cond_value)) + + return False, "..." + + def print_stats(self, chars: bool = True) -> None: + print( + json.dumps( + self._stats_collector.get_jsonable_dict( + quantile_settings=self._quantile_settings, + chars=chars, + ), + sort_keys=True, + indent=4, + ) + ) diff --git a/lib/dl_maintenance/dl_maintenance/core/crawlers/crypto_keys_rotation.py b/lib/dl_maintenance/dl_maintenance/core/crawlers/crypto_keys_rotation.py new file mode 100644 index 000000000..e4fe90b45 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/core/crawlers/crypto_keys_rotation.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import logging +from typing import ( + Any, + AsyncIterable, + Optional, +) + +import attr + +from dl_core.us_entry import ( + USEntry, + USMigrationEntry, +) +from dl_core.us_manager.us_manager_async import AsyncUSManager +from dl_maintenance.core.us_crawler_base import USEntryCrawler + + +LOGGER = logging.getLogger(__name__) + + +@attr.s +class RotateCryptoKeyInConnection(USEntryCrawler): + ENTRY_TYPE = USMigrationEntry + + def get_raw_entry_iterator(self, crawl_all_tenants: bool) -> AsyncIterable[dict[str, Any]]: + return self.usm.get_raw_collection(entry_scope="connection", all_tenants=crawl_all_tenants) + + async def process_entry_get_save_flag( + self, entry: USEntry, logging_extra: dict[str, Any], usm: Optional[AsyncUSManager] = None + ) -> tuple[bool, str]: + assert isinstance(entry, USMigrationEntry) + fields_key_info = self.usm.get_sensitive_fields_key_info(entry) + if len(fields_key_info) == 0: + return False, "Entry does not contains sensitive fields" + + fields_with_non_actual_crypto_keys = [ + field_name + for field_name, key_info in fields_key_info.items() + if key_info is not None + and key_info.key_id is not None # Case if in versioned data and value of field is None + and key_info.key_id != self.usm.actual_crypto_key_id + ] + fields_with_non_actual_crypto_keys.sort() + + missing_fields = [field_name for field_name, key_info in fields_key_info.items() if key_info is None] + + if missing_fields: + LOGGER.warning( + "US entry has missing sensitive fields: %s %s", entry.uuid, missing_fields, extra=logging_extra + ) + + if len(fields_with_non_actual_crypto_keys) == 0: + return False, "All sensitive fields are encrypted with actual keys" + + self.usm.actualize_crypto_keys(entry) + return True, f"Some sensitive fields are encrypted with not actual keys: {fields_with_non_actual_crypto_keys}" diff --git a/lib/dl_maintenance/dl_maintenance/core/logging_config.py b/lib/dl_maintenance/dl_maintenance/core/logging_config.py new file mode 100644 index 000000000..b7d48928d --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/core/logging_config.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from typing import Optional + +import shortuuid + +from dl_app_tools import log +from dl_core.logging_config import ( + configure_logging, + logcfg_process_stream_human_readable, +) + + +def configure_logging_for_shell(app_name: str = "ad_hoc_operation", for_development: Optional[bool] = None) -> None: + if "request_id" not in log.context.get_log_context(): + log.context.put_to_context("request_id", shortuuid.uuid()) + + configure_logging( + for_development=for_development, app_name=app_name, logcfg_processors=(logcfg_process_stream_human_readable,) + ) diff --git a/lib/dl_maintenance/dl_maintenance/core/us_crawler_base.py b/lib/dl_maintenance/dl_maintenance/core/us_crawler_base.py new file mode 100644 index 000000000..c2bd297c2 --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/core/us_crawler_base.py @@ -0,0 +1,373 @@ +from __future__ import annotations + +import contextlib +import copy +import enum +import logging +import time +from typing import ( + Any, + AsyncGenerator, + AsyncIterable, + ClassVar, + Optional, + Sequence, + Type, +) + +import attr +import shortuuid + +from dl_api_commons.base_models import TenantDef +from dl_api_commons.logging import ( + extra_with_evt_code, + format_dict, +) +from dl_core.us_entry import ( + USEntry, + USMigrationEntry, +) +from dl_core.us_manager.us_manager_async import AsyncUSManager +from dl_maintenance.diff_utils import ( + get_diff_text, + get_pre_save_top_level_dict, +) +from dl_utils.task_runner import ( + ConcurrentTaskRunner, + TaskRunner, +) + + +LOGGER = logging.getLogger(__name__) + + +class EntryHandlingResult(enum.Enum): + SUCCESS = enum.auto() + FAILED = enum.auto() + SKIPPED = enum.auto() + + +# Logging event code +EVT_CODE_RUN_START = "us_entry_crawler_run_start" +EVT_CODE_ENTRY_HANDLING_START = "us_entry_crawler_entry_handling_start" +EVT_CODE_ENTRY_PROCESSING_SUCCESS = "us_entry_crawler_entry_processing_success" +EVT_CODE_DIFF_CALC_EXC = "us_entry_crawler_entry_diff_calc_exc" +EVT_CODE_ENTRY_HANDLING_END = "us_entry_crawler_entry_handling_done" +EVT_CODE_RUN_END = "us_entry_crawler_run_end" + + +@attr.s +class USEntryCrawler: + ENTRY_TYPE: ClassVar[Type[USEntry]] = None # type: ignore # TODO: fix # Must be set in subclass + + _dry_run: bool = attr.ib() # should always be specified explicitly. + _usm: Optional[AsyncUSManager] = attr.ib(default=None) + _target_tenant: Optional[TenantDef] = attr.ib(default=None) + _concurrency_limit: int = attr.ib(default=10) + # internals + _run_fired: bool = attr.ib(init=False, default=False) + _run_id: str = attr.ib(init=False, factory=shortuuid.uuid) + _task_runner: TaskRunner = attr.ib( + kw_only=True, default=attr.Factory(lambda self: ConcurrentTaskRunner(self._concurrency_limit), takes_self=True) + ) + + def __attrs_post_init__(self) -> None: + if self._usm is not None: + self.ensure_usm_settings_fix_if_possible(self._usm) + + @property + def usm(self) -> AsyncUSManager: + assert self._usm is not None + return self._usm + + @property + def type_name(self) -> str: + return type(self).__qualname__ + + @property + def run_id(self) -> str: + return self._run_id + + def ensure_usm_settings_fix_if_possible(self, usm: AsyncUSManager) -> None: + if self._target_tenant: + # Setting tenant override is required to use regular listing method instead of inter-tenant + usm.set_tenant_override(self._target_tenant) + + def set_usm(self, usm: AsyncUSManager) -> None: + if self._usm is not None: + raise ValueError("USManager already set for this crawler") + self.ensure_usm_settings_fix_if_possible(usm) + self._usm = usm + + def copy(self, reset_usm: bool = True) -> USEntryCrawler: + changes: dict[str, Any] = {} + if reset_usm: + changes.update(usm=None) + return attr.evolve(self, **changes) + + def get_raw_entry_iterator(self, crawl_all_tenants: bool) -> AsyncIterable[dict[str, Any]]: + """ + Must returns iterable of US responses for targets to crawl. + Should be implemented in subclasses. + """ + raise NotImplementedError() + + async def process_entry_get_save_flag( + self, entry: USEntry, logging_extra: dict[str, Any], usm: Optional[AsyncUSManager] = None + ) -> tuple[bool, str]: + raise NotImplementedError() + + @contextlib.asynccontextmanager + async def locked_entry_cm( + self, + entry_id: str, + logging_extra: dict[str, Any], + usm: AsyncUSManager, + ) -> AsyncGenerator[Optional[USEntry], None]: + if self._dry_run: + try: + entry = await usm.get_by_id(entry_id, expected_type=self.ENTRY_TYPE) # type: ignore # TODO: fix + except Exception: + logging_extra.update(us_entry_crawler_exc_stage="entry_locked_load") + raise + else: + yield entry + else: + async with contextlib.AsyncExitStack() as cmstack: + try: + entry = await cmstack.enter_async_context( + usm.locked_entry_cm( + expected_type=self.ENTRY_TYPE, + entry_id=entry_id, + duration_sec=30, + wait_timeout_sec=60, + ) + ) + except Exception: + logging_extra.update(us_entry_crawler_exc_stage="entry_locked_load") + raise + else: + yield entry + + async def save_entry(self, entry: USEntry, usm: AsyncUSManager) -> None: + if self._dry_run: + return + else: + await usm.save(entry) + + async def run(self) -> None: + if self._run_fired: + raise ValueError("Attempt to consequent call of USEntryCrawler.run()") + self._run_fired = True + + crawler_run_extra: dict[str, Any] = dict( + us_entry_crawler_name=self.type_name, + us_entry_crawler_dry_run=self._dry_run, + us_entry_crawler_run_id=self._run_id, + ) + LOGGER.info( + "Starting US entry crawler run: %s", + format_dict( + crawler_run_extra, + name="us_entry_crawler_name", + run_id="us_entry_crawler_run_id", + is_dry_run="us_entry_crawler_dry_run", + ), + extra=extra_with_evt_code(EVT_CODE_RUN_START, crawler_run_extra), + ) + + started_ts = time.monotonic() + + try: + map_handling_status_entry_id = await self._run(crawler_run_extra) + except Exception: # noqa + crawler_run_extra.update(us_entry_crawler_run_success=False) + LOGGER.exception( + "Crawler run failure: %s %s", + self.type_name, + self._run_id, + extra=extra_with_evt_code(EVT_CODE_RUN_END, crawler_run_extra), + ) + else: + time_elapsed = int(round(time.monotonic() - started_ts)) + # TODO FIX: Add availability to define post-run hook in subclass + counters = {key: len(val) for key, val in map_handling_status_entry_id.items()} + + crawler_run_extra.update( + us_entry_crawler_run_success=True, + us_entry_crawler_run_cnt_processed=counters[EntryHandlingResult.SUCCESS], + us_entry_crawler_run_cnt_skipped=counters[EntryHandlingResult.SKIPPED], + us_entry_crawler_run_cnt_failed=counters[EntryHandlingResult.FAILED], + us_entry_crawler_run_cnt_total=sum(counters.values()), + us_entry_crawler_run_time_elapsed=time_elapsed, + ) + + LOGGER.info( + "Crawler run finished successfully: %s", + format_dict( + crawler_run_extra, + total="us_entry_crawler_run_cnt_total", + processed="us_entry_crawler_run_cnt_processed", + skipped="us_entry_crawler_run_cnt_skipped", + failed="us_entry_crawler_run_cnt_failed", + time_elapsed="us_entry_crawler_run_time_elapsed", + ), + extra=extra_with_evt_code(EVT_CODE_RUN_END, crawler_run_extra), + ) + + async def _run(self, crawler_run_extra: dict[str, Any]) -> dict[EntryHandlingResult, Sequence[str]]: + entry_id_distribution: dict[EntryHandlingResult, list[str]] = { + result_code: [] for result_code in EntryHandlingResult + } + + await self._task_runner.initialize() + + entry_idx = 0 + async for raw_entry in self.get_raw_entry_iterator(crawl_all_tenants=(self._target_tenant is None)): + await self._task_runner.schedule( + self.single_entry_run( + entry_idx=entry_idx, + raw_entry=raw_entry, + crawler_run_extra=crawler_run_extra, + entry_id_distribution=entry_id_distribution, + ) + ) + + entry_idx += 1 + + await self._task_runner.finalize() + + return entry_id_distribution # type: ignore + + async def single_entry_run( + self, + entry_idx: int, + raw_entry: dict, + crawler_run_extra: dict, + entry_id_distribution: dict[EntryHandlingResult, list[str]], + ) -> None: + entry_id = raw_entry["entryId"] + entry_handling_extra = dict( + crawler_run_extra, + us_entry_id=entry_id, + us_entry_key=raw_entry["key"], + us_entry_tenant_id=raw_entry.get("tenantId", None), + us_entry_type=raw_entry["type"], + us_entry_scope=raw_entry["scope"], + us_entry_crawler_idx=entry_idx, + ) + LOGGER.info( + "Crawler going to handle entry: %s", + entry_id, + extra=extra_with_evt_code(EVT_CODE_ENTRY_HANDLING_START, entry_handling_extra), + ) + try: + result = await self._handle_single_entry( + raw_entry=raw_entry, + entry_handling_extra=entry_handling_extra, + ) + entry_handling_extra.update(entry_handling_status=result.name) + except Exception: # noqa + result = EntryHandlingResult.FAILED + entry_handling_extra.update(entry_handling_status=result.name) + LOGGER.exception( + "Unexpected exception during handling entry %s", + entry_id, + extra=extra_with_evt_code(EVT_CODE_ENTRY_HANDLING_END, entry_handling_extra), + ) + else: + LOGGER.info( + "Crawler complete entry handling: %s %s", + entry_id, + result.name, + extra=extra_with_evt_code(EVT_CODE_ENTRY_HANDLING_END, entry_handling_extra), + ) + + entry_id_distribution[result].append(entry_id) + + async def _handle_single_entry( + self, raw_entry: dict[str, Any], entry_handling_extra: dict[str, Any] + ) -> EntryHandlingResult: + entry_id = raw_entry["entryId"] + usm = self.usm + async with self.locked_entry_cm(entry_id, entry_handling_extra, usm=usm) as target_entry: # type: ignore # TODO: fix + assert target_entry is not None + # For future diff calculation reliability + # (see `dl_maintenance.diff_utils.get_pre_save_top_level_dict`) + assert target_entry._us_resp is not None + us_resp: dict = copy.deepcopy(target_entry._us_resp) + target_entry._us_resp = us_resp + + entry_handling_extra.update( + us_entry_rev=us_resp["revId"], + ) + try: + need_save, processing_msg = await self.process_entry_get_save_flag( + target_entry, + logging_extra=dict(entry_handling_extra), + usm=usm, + ) + except Exception: + entry_handling_extra.update(us_entry_crawler_exc_stage="entry_processing") + raise + + target_entry_diff_str = self._calculate_diff_str(target_entry, entry_handling_extra) + entry_handling_extra.update( + us_entry_crawler_need_save=need_save, + us_entry_crawler_proc_msg=processing_msg, + us_entry_crawler_entry_diff=target_entry_diff_str, + ) + + diff_str = get_diff_text(target_entry, us_manager=usm) + if diff_str: + diff_str = "\n" + diff_str + LOGGER.info( + "Entry was processed by crawler: %s%s", + format_dict( + entry_handling_extra, + idx="us_entry_crawler_idx", + entry_id="us_entry_id", + scope="us_entry_scope", + type="us_entry_type", + need_save="us_entry_crawler_need_save", + msg="us_entry_crawler_proc_msg", + diff="us_entry_crawler_entry_diff", + ), + diff_str, + extra=extra_with_evt_code(EVT_CODE_ENTRY_PROCESSING_SUCCESS, entry_handling_extra), + ) + + try: + if need_save: + await self.save_entry(target_entry, usm=usm) + return EntryHandlingResult.SUCCESS + else: + return EntryHandlingResult.SKIPPED + + except Exception: + entry_handling_extra.update(us_entry_crawler_exc_stage="entry_save") + raise + + @staticmethod + def _calculate_diff_str(target_entry: USEntry, entry_handling_extra: dict[str, Any]) -> Optional[str]: + try: + if isinstance(target_entry, USMigrationEntry): + entry_diff = get_pre_save_top_level_dict(target_entry) + else: + return "N/A" + except Exception: # noqa + LOGGER.warning( + "Exception during diff calculation", + extra=extra_with_evt_code(EVT_CODE_DIFF_CALC_EXC, entry_handling_extra), + ) + return "N/A" + + try: + return entry_diff.short_str() + except Exception: # noqa + LOGGER.warning( + "Can not pretty stringify diff for entry: %s", + target_entry.uuid, + extra=extra_with_evt_code(EVT_CODE_DIFF_CALC_EXC, entry_handling_extra), + ) + return str(entry_diff) diff --git a/lib/dl_maintenance/dl_maintenance/diff_utils.py b/lib/dl_maintenance/dl_maintenance/diff_utils.py new file mode 100644 index 000000000..da1b8fe2b --- /dev/null +++ b/lib/dl_maintenance/dl_maintenance/diff_utils.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import difflib +from typing import ( + Any, + Dict, + Iterable, + List, + Optional, + Sequence, +) + +import attr +import yaml + +from dl_core.us_entry import ( + USEntry, + USMigrationEntry, +) +from dl_core.us_manager.us_manager import USManagerBase + + +@attr.s(frozen=True, auto_attribs=True) +class DictFieldsDiff: + added: Sequence[str] + modified: Sequence[str] + removed: Sequence[str] + + def __bool__(self): # type: ignore # TODO: fix + return any(attr.asdict(self).values()) + + def short_str(self): # type: ignore # TODO: fix + parts = [f"{f_name}={f_val}" for f_name, f_val in attr.asdict(self).items() if f_val] + return f"({';'.join(parts)})" + + +@attr.s(frozen=True, auto_attribs=True) +class EntryFieldsDiff: + data: DictFieldsDiff + unversioned_data: DictFieldsDiff + meta: DictFieldsDiff + + def short_str(self): # type: ignore # TODO: fix + parts = [f"{f_name}={f_val.short_str()}" for f_name, f_val in attr.asdict(self, recurse=False).items() if f_val] + return f"Diff({' '.join(parts)})" + + def __bool__(self): # type: ignore # TODO: fix + return any(attr.asdict(self).values()) + + +def get_dict_top_level_diff(a: Dict[str, Any], b: Dict[str, Any]) -> DictFieldsDiff: + a_fields = set(a.keys()) + b_fields = set(b.keys()) + + return DictFieldsDiff( + removed=tuple(sorted(a_fields - b_fields)), + added=tuple(sorted(b_fields - a_fields)), + modified=tuple(sorted(field for field in a_fields & b_fields if a[field] != b[field])), + ) + + +def get_pre_save_top_level_dict(entry: USMigrationEntry): # type: ignore # TODO: fix + us_resp = entry._us_resp + return EntryFieldsDiff( + data=get_dict_top_level_diff(us_resp["data"], entry.data), # type: ignore # TODO: fix + unversioned_data=get_dict_top_level_diff(us_resp.get("unversionedData") or {}, entry.unversioned_data), # type: ignore # TODO: fix + meta=get_dict_top_level_diff(us_resp["meta"], entry.meta), # type: ignore # TODO: fix + ) + + +def _colorize_diff(text: str) -> str: + from pygments import ( + formatters, + highlight, + lexers, + ) + + return highlight( + text, + lexer=lexers.DiffLexer(), # type: ignore # TODO: fix + # formatter=formatters.TerminalFormatter(), + # Try also: + formatter=formatters.Terminal256Formatter(), # type: ignore # TODO: fix + ) + + +def dump_yaml_for_diff(value: Any) -> List[str]: + return yaml.safe_dump( + value, + default_flow_style=False, + allow_unicode=True, + encoding=None, + ).splitlines() + + +# https://github.com/venthur/python-gron (MIT) +# minimized +def dump_gron( + value: Any, + root_name: str = "", + init_values: bool = False, + line_tpl: str = "{name} = {value};", + sort_keys: bool = False, +) -> Iterable[str]: + def quote_key(key: Any) -> str: + return f".{key}" + + def gron_walk(node: Any, name: str) -> Iterable[str]: + if node is None: + yield line_tpl.format(name=name, value="null") + return + if isinstance(node, bool): + yield line_tpl.format(name=name, value="true" if node else "false") + return + if isinstance(node, (str, bytes)): + yield line_tpl.format(name=name, value='"{}"'.format(node)) # type: ignore # TODO: fix + return + if isinstance(node, dict): + if init_values: + yield line_tpl.format(name=name, value="{}") + items = node.items() + if sort_keys: + items = sorted(items) # type: ignore # TODO: fix + for key, value in items: + children = gron_walk(value, name="{}{}".format(name, quote_key(key))) + for child in children: + yield child + return + if isinstance(node, (list, tuple)): + if init_values: + yield line_tpl.format(name=name, value="[]") + for idx, value in enumerate(node): + children = gron_walk(value, name="{}[{}]".format(name, idx)) + for child in children: + yield child + return + yield line_tpl.format( + name=name, + value=str(node), + ) + return + + return gron_walk(value, name=root_name) + + +def make_diff( # type: ignore # TODO: fix + value_a, value_b, unified_n: int = 1, dumper=lambda value: dump_gron(value, sort_keys=True), colorize: bool = True +): + value_a_lines = list(dumper(value_a)) + value_b_lines = list(dumper(value_b)) + diff_lines = difflib.unified_diff( + value_a_lines, + value_b_lines, + n=unified_n, + ) + result = "\n".join(line for line in diff_lines if not line.startswith("---") and not line.startswith("+++")) + result = result.strip() + + if colorize: + try: + result = _colorize_diff(result).strip() + except Exception: + pass + return result + + +def get_diff_text( # type: ignore # TODO: fix + entry: USEntry, + us_manager: USManagerBase, + unified_n: int = 1, # recommended: 1 for gron, 5 for yaml + dumper=dump_gron, + colorize: bool = True, +) -> Optional[str]: + if not isinstance(entry, USMigrationEntry): + return None + + us_resp = entry._us_resp + value_a = dict( + data=dict(us_resp["data"]), # type: ignore # TODO: fix + unversioned_data=dict(us_resp.get("unversionedData") or {}), # type: ignore # TODO: fix + meta=us_resp["meta"], # type: ignore # TODO: fix + ) + value_b = dict( + data=dict(us_manager.dump_data(entry)), + unversioned_data=dict(getattr(entry, "unversioned_data", {})), + meta=dict(entry.meta), + ) + + result = make_diff(value_a=value_a, value_b=value_b, unified_n=unified_n, dumper=dumper, colorize=colorize) + return result diff --git a/lib/dl_maintenance/dl_maintenance/py.typed b/lib/dl_maintenance/dl_maintenance/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance_tests/__init__.py b/lib/dl_maintenance/dl_maintenance_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance_tests/unit/__init__.py b/lib/dl_maintenance/dl_maintenance_tests/unit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/dl_maintenance_tests/unit/conftest.py b/lib/dl_maintenance/dl_maintenance_tests/unit/conftest.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_maintenance/pyproject.toml b/lib/dl_maintenance/pyproject.toml new file mode 100644 index 000000000..874c5ff5f --- /dev/null +++ b/lib/dl_maintenance/pyproject.toml @@ -0,0 +1,48 @@ + +[tool.poetry] +name = "datalens-maintenance" +version = "0.0.1" +description = "" +authors = ["DataLens Team "] +packages = [{include = "dl_maintenance"}] +license = "Apache 2.0" +readme = "README.md" + + +[tool.poetry.dependencies] +attrs = ">=22.2.0" +pygments = ">=2.4.0" +python = ">=3.10, <3.12" +pyyaml = ">=5.3.1" +shortuuid = ">=1.0.11" +tabulate = ">=0.9.0" +datalens-api-commons = {path = "../dl_api_commons"} +datalens-utils = {path = "../dl_utils"} +datalens-constants = {path = "../dl_constants"} +datalens-configs = {path = "../dl_configs"} +datalens-core = {path = "../dl_core"} +datalens-app-tools = {path = "../dl_app_tools"} +datalens-query-processing = {path = "../dl_query_processing"} +datalens-api-lib = {path = "../dl_api_lib"} + +[tool.poetry.group.tests.dependencies] +pytest = ">=7.2.2" +[build-system] +build-backend = "poetry.core.masonry.api" +requires = [ + "poetry-core", +] + +[tool.pytest.ini_options] +minversion = "6.0" +addopts = "-ra" +testpaths = [] + +[datalens_ci] +skip_test = true + +[tool.mypy] +warn_unused_configs = true +disallow_untyped_defs = true +check_untyped_defs = true +strict_optional = true diff --git a/metapkg/poetry.lock b/metapkg/poetry.lock index 6a9633917..5ab0b336a 100644 --- a/metapkg/poetry.lock +++ b/metapkg/poetry.lock @@ -1915,6 +1915,34 @@ attrs = ">=22.2.0" type = "directory" url = "../lib/dl_i18n" +[[package]] +name = "datalens-maintenance" +version = "0.0.1" +description = "" +optional = false +python-versions = ">=3.10, <3.12" +files = [] +develop = false + +[package.dependencies] +attrs = ">=22.2.0" +datalens-api-commons = {path = "../dl_api_commons"} +datalens-api-lib = {path = "../dl_api_lib"} +datalens-app-tools = {path = "../dl_app_tools"} +datalens-configs = {path = "../dl_configs"} +datalens-constants = {path = "../dl_constants"} +datalens-core = {path = "../dl_core"} +datalens-query-processing = {path = "../dl_query_processing"} +datalens-utils = {path = "../dl_utils"} +pygments = ">=2.4.0" +pyyaml = ">=5.3.1" +shortuuid = ">=1.0.11" +tabulate = ">=0.9.0" + +[package.source] +type = "directory" +url = "../lib/dl_maintenance" + [[package]] name = "datalens-model-tools" version = "0.0.1" @@ -2893,6 +2921,7 @@ files = [ {file = "greenlet-2.0.2-cp27-cp27m-win32.whl", hash = "sha256:6c3acb79b0bfd4fe733dff8bc62695283b57949ebcca05ae5c129eb606ff2d74"}, {file = "greenlet-2.0.2-cp27-cp27m-win_amd64.whl", hash = "sha256:283737e0da3f08bd637b5ad058507e578dd462db259f7f6e4c5c365ba4ee9343"}, {file = "greenlet-2.0.2-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:d27ec7509b9c18b6d73f2f5ede2622441de812e7b1a80bbd446cb0633bd3d5ae"}, + {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d967650d3f56af314b72df7089d96cda1083a7fc2da05b375d2bc48c82ab3f3c"}, {file = "greenlet-2.0.2-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:30bcf80dda7f15ac77ba5af2b961bdd9dbc77fd4ac6105cee85b0d0a5fcf74df"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26fbfce90728d82bc9e6c38ea4d038cba20b7faf8a0ca53a9c07b67318d46088"}, {file = "greenlet-2.0.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9190f09060ea4debddd24665d6804b995a9c122ef5917ab26e1566dcc712ceeb"}, @@ -2901,6 +2930,7 @@ files = [ {file = "greenlet-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:76ae285c8104046b3a7f06b42f29c7b73f77683df18c49ab5af7983994c2dd91"}, {file = "greenlet-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:2d4686f195e32d36b4d7cf2d166857dbd0ee9f3d20ae349b6bf8afc8485b3645"}, {file = "greenlet-2.0.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:c4302695ad8027363e96311df24ee28978162cdcdd2006476c43970b384a244c"}, + {file = "greenlet-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d4606a527e30548153be1a9f155f4e283d109ffba663a15856089fb55f933e47"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c48f54ef8e05f04d6eff74b8233f6063cb1ed960243eacc474ee73a2ea8573ca"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a1846f1b999e78e13837c93c778dcfc3365902cfb8d1bdb7dd73ead37059f0d0"}, {file = "greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a06ad5312349fec0ab944664b01d26f8d1f05009566339ac6f63f56589bc1a2"}, @@ -2930,6 +2960,7 @@ files = [ {file = "greenlet-2.0.2-cp37-cp37m-win32.whl", hash = "sha256:3f6ea9bd35eb450837a3d80e77b517ea5bc56b4647f5502cd28de13675ee12f7"}, {file = "greenlet-2.0.2-cp37-cp37m-win_amd64.whl", hash = "sha256:7492e2b7bd7c9b9916388d9df23fa49d9b88ac0640db0a5b4ecc2b653bf451e3"}, {file = "greenlet-2.0.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:b864ba53912b6c3ab6bcb2beb19f19edd01a6bfcbdfe1f37ddd1778abfe75a30"}, + {file = "greenlet-2.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1087300cf9700bbf455b1b97e24db18f2f77b55302a68272c56209d5587c12d1"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:ba2956617f1c42598a308a84c6cf021a90ff3862eddafd20c3333d50f0edb45b"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc3a569657468b6f3fb60587e48356fe512c1754ca05a564f11366ac9e306526"}, {file = "greenlet-2.0.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8eab883b3b2a38cc1e050819ef06a7e6344d4a990d24d45bc6f2cf959045a45b"}, @@ -2938,6 +2969,7 @@ files = [ {file = "greenlet-2.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0ef99cdbe2b682b9ccbb964743a6aca37905fda5e0452e5ee239b1654d37f2a"}, {file = "greenlet-2.0.2-cp38-cp38-win32.whl", hash = "sha256:b80f600eddddce72320dbbc8e3784d16bd3fb7b517e82476d8da921f27d4b249"}, {file = "greenlet-2.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:4d2e11331fc0c02b6e84b0d28ece3a36e0548ee1a1ce9ddde03752d9b79bba40"}, + {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8512a0c38cfd4e66a858ddd1b17705587900dd760c6003998e9472b77b56d417"}, {file = "greenlet-2.0.2-cp39-cp39-macosx_11_0_x86_64.whl", hash = "sha256:88d9ab96491d38a5ab7c56dd7a3cc37d83336ecc564e4e8816dbed12e5aaefc8"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:561091a7be172ab497a3527602d467e2b3fbe75f9e783d8b8ce403fa414f71a6"}, {file = "greenlet-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:971ce5e14dc5e73715755d0ca2975ac88cfdaefcaab078a284fea6cfabf866df"}, @@ -3511,6 +3543,16 @@ files = [ {file = "MarkupSafe-2.1.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:5bbe06f8eeafd38e5d0a4894ffec89378b6c6a625ff57e3028921f8ff59318ac"}, {file = "MarkupSafe-2.1.3-cp311-cp311-win32.whl", hash = "sha256:dd15ff04ffd7e05ffcb7fe79f1b98041b8ea30ae9234aed2a9168b5797c3effb"}, {file = "MarkupSafe-2.1.3-cp311-cp311-win_amd64.whl", hash = "sha256:134da1eca9ec0ae528110ccc9e48041e0828d79f24121a1a146161103c76e686"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:f698de3fd0c4e6972b92290a45bd9b1536bffe8c6759c62471efaa8acb4c37bc"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aa57bd9cf8ae831a362185ee444e15a93ecb2e344c8e52e4d721ea3ab6ef1823"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ffcc3f7c66b5f5b7931a5aa68fc9cecc51e685ef90282f4a82f0f5e9b704ad11"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:47d4f1c5f80fc62fdd7777d0d40a2e9dda0a05883ab11374334f6c4de38adffd"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1f67c7038d560d92149c060157d623c542173016c4babc0c1913cca0564b9939"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:9aad3c1755095ce347e26488214ef77e0485a3c34a50c5a5e2471dff60b9dd9c"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:14ff806850827afd6b07a5f32bd917fb7f45b046ba40c57abdb636674a8b559c"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8f9293864fe09b8149f0cc42ce56e3f0e54de883a9de90cd427f191c346eb2e1"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-win32.whl", hash = "sha256:715d3562f79d540f251b99ebd6d8baa547118974341db04f5ad06d5ea3eb8007"}, + {file = "MarkupSafe-2.1.3-cp312-cp312-win_amd64.whl", hash = "sha256:1b8dd8c3fd14349433c79fa8abeb573a55fc0fdd769133baac1f5e07abf54aeb"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:8e254ae696c88d98da6555f5ace2279cf7cd5b3f52be2b5cf97feafe883b58d2"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cb0932dc158471523c9637e807d9bfb93e06a95cbf010f1a38b98623b929ef2b"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9402b03f1a1b4dc4c19845e5c749e3ab82d5078d16a2a4c2cd2df62d57bb0707"}, @@ -6048,4 +6090,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.10, <3.12" -content-hash = "ba47c87a3ba681481552ba36c78609734cfdfb3be94f8de537ce802e098e1485" +content-hash = "29d3c748752ba2eec74eff6fed316a23a978150fd4bcc35f902edaf971435bbc" diff --git a/metapkg/pyproject.toml b/metapkg/pyproject.toml index bb4c8b83a..25c18bcd9 100644 --- a/metapkg/pyproject.toml +++ b/metapkg/pyproject.toml @@ -134,6 +134,7 @@ datalens-task-processor = {path = "../lib/dl_task_processor"} datalens-connector-oracle = {path = "../lib/dl_connector_oracle"} datalens-connector-mysql = {path = "../lib/dl_connector_mysql"} datalens-sqlalchemy-mysql = {path = "../lib/dl_sqlalchemy_mysql"} +datalens-maintenance = {path = "../lib/dl_maintenance"} [tool.poetry.group.dev.dependencies] black = "==23.3.0"