diff --git a/src/spaceone/inventory_v2/interface/grpc/__init__.py b/src/spaceone/inventory_v2/interface/grpc/__init__.py index 444bcfd..6ff8afd 100644 --- a/src/spaceone/inventory_v2/interface/grpc/__init__.py +++ b/src/spaceone/inventory_v2/interface/grpc/__init__.py @@ -1,7 +1,9 @@ from spaceone.core.pygrpc.server import GRPCServer from .region import Region +from .collector import Collector _all_ = ["app"] app = GRPCServer() -app.add_service(Region) \ No newline at end of file +app.add_service(Region) +app.add_service(Collector) diff --git a/src/spaceone/inventory_v2/interface/grpc/collector.py b/src/spaceone/inventory_v2/interface/grpc/collector.py new file mode 100644 index 0000000..52627d5 --- /dev/null +++ b/src/spaceone/inventory_v2/interface/grpc/collector.py @@ -0,0 +1,66 @@ +from google.protobuf.json_format import ParseDict +from spaceone.api.inventory_v2.v1 import collector_pb2, collector_pb2_grpc +from spaceone.api.inventory_v2.v1 import job_pb2 +from spaceone.core.pygrpc import BaseAPI + +from spaceone.inventory_v2.service.collector_service import CollectorService + + +class Collector(BaseAPI, collector_pb2_grpc.CollectorServicer): + pb2 = collector_pb2 + pb2_grpc = collector_pb2_grpc + + def create(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + response: dict = collector_svc.create(params) + return self.dict_to_message(response) + + def update(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + response: dict = collector_svc.update(params) + return self.dict_to_message(response) + + def delete(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + collector_svc.delete(params) + return self.empty() + + def get(self, request, context): + params, metadata = self.parse_request(request, context) + + collector_svc = CollectorService(metadata) + response: dict = collector_svc.get(params) + return self.dict_to_message(response) + + def list(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + response: dict = collector_svc.list(params) + return self.dict_to_message(response) + + def stat(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + response: dict = collector_svc.stat(params) + return self.dict_to_message(response) + + def collect(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + response: dict = collector_svc.collect(params) + return ParseDict(response, job_pb2.JobInfo()) + + def update_plugin(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + response: dict = collector_svc.update_plugin(params) + return self.dict_to_message(response) + + def verify_plugin(self, request, context): + params, metadata = self.parse_request(request, context) + collector_svc = CollectorService(metadata) + collector_svc.verify_plugin(params) + return self.empty() diff --git a/src/spaceone/inventory_v2/manager/collection_state_manager.py b/src/spaceone/inventory_v2/manager/collection_state_manager.py new file mode 100644 index 0000000..bbe8208 --- /dev/null +++ b/src/spaceone/inventory_v2/manager/collection_state_manager.py @@ -0,0 +1,100 @@ +import logging +from typing import Union, Tuple, List + +from spaceone.core.model.mongo_model import QuerySet +from spaceone.core.manager import BaseManager +from spaceone.inventory_v2.model.collection_state.database import CollectionState + +_LOGGER = logging.getLogger(__name__) + + +class CollectionStateManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.collector_id = self.transaction.get_meta("collector_id") + self.job_task_id = self.transaction.get_meta("job_task_id") + self.secret_id = self.transaction.get_meta("secret.secret_id") + self.collection_state_model = CollectionState + + def create_collection_state(self, cloud_service_id: str, domain_id: str) -> None: + def _rollback(vo: CollectionState): + _LOGGER.info( + f"[ROLLBACK] Delete collection state: cloud_service_id = {vo.cloud_service_id}, " + f"collector_id = {vo.collector_id}" + ) + vo.terminate() + + if self.collector_id and self.job_task_id and self.secret_id: + state_data = { + "collector_id": self.collector_id, + "job_task_id": self.job_task_id, + "secret_id": self.secret_id, + "cloud_service_id": cloud_service_id, + "domain_id": domain_id, + } + + state_vo = self.collection_state_model.create(state_data) + self.transaction.add_rollback(_rollback, state_vo) + + def update_collection_state_by_vo( + self, params: dict, state_vo: CollectionState + ) -> CollectionState: + def _rollback(old_data): + _LOGGER.info( + f"[ROLLBACK] Revert collection state : cloud_service_id = {state_vo.cloud_service_id}, " + f"collector_id = {state_vo.collector_id}" + ) + state_vo.update(old_data) + + self.transaction.add_rollback(_rollback, state_vo.to_dict()) + return state_vo.update(params) + + def reset_collection_state(self, state_vo: CollectionState) -> None: + if self.job_task_id: + params = {"disconnected_count": 0, "job_task_id": self.job_task_id} + + self.update_collection_state_by_vo(params, state_vo) + + def get_collection_state( + self, cloud_service_id: str, domain_id: str + ) -> Union[CollectionState, None]: + if self.collector_id and self.secret_id: + state_vos = self.collection_state_model.filter( + collector_id=self.collector_id, + secret_id=self.secret_id, + cloud_service_id=cloud_service_id, + domain_id=domain_id, + ) + + if state_vos.count() > 0: + return state_vos[0] + + return None + + def filter_collection_states(self, **conditions) -> QuerySet: + return self.collection_state_model.filter(**conditions) + + def list_collection_states(self, query: dict) -> Tuple[QuerySet, int]: + return self.collection_state_model.query(**query) + + def delete_collection_state_by_cloud_service_id( + self, resource_id: str, domain_id: str + ) -> None: + state_vos = self.filter_collection_states( + cloud_service_id=resource_id, domain_id=domain_id + ) + state_vos.delete() + + def delete_collection_state_by_cloud_service_ids( + self, cloud_service_ids: List[str] + ) -> None: + state_vos = self.filter_collection_states(cloud_service_id=cloud_service_ids) + state_vos.delete() + + def delete_collection_state_by_collector_id( + self, collector_id: str, domain_id: str + ) -> None: + state_vos = self.filter_collection_states( + collector_id=collector_id, domain_id=domain_id + ) + state_vos.delete() diff --git a/src/spaceone/inventory_v2/manager/collector_manager.py b/src/spaceone/inventory_v2/manager/collector_manager.py new file mode 100644 index 0000000..90b19c8 --- /dev/null +++ b/src/spaceone/inventory_v2/manager/collector_manager.py @@ -0,0 +1,90 @@ +import logging +from typing import Tuple, Union +from datetime import datetime +from spaceone.core.manager import BaseManager +from spaceone.core.model.mongo_model import QuerySet +from spaceone.inventory_v2.model.collector.database import Collector + +__ALL__ = ["CollectorManager"] + +_LOGGER = logging.getLogger(__name__) + + +class CollectorManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.collector_model = Collector() + + def create_collector(self, params: dict) -> Collector: + def _rollback(vo: Collector): + _LOGGER.info(f"[ROLLBACK] Delete collector : {vo.name} ({vo.collector_id})") + vo.delete() + + collector_vo: Collector = self.collector_model.create(params) + self.transaction.add_rollback(_rollback, collector_vo) + return collector_vo + + def update_collector_by_vo( + self, params: dict, collector_vo: Collector + ) -> Collector: + def _rollback(old_data): + _LOGGER.info(f"[ROLLBACK] Revert Data : {old_data.get('collector_id')}") + collector_vo.update(old_data) + + self.transaction.add_rollback(_rollback, collector_vo.to_dict()) + return collector_vo.update(params) + + def enable_collector( + self, collector_id: str, domain_id: str, workspace_id: str = None + ): + collector_vo: Collector = self.collector_model.get( + collector_id=collector_id, domain_id=domain_id, workspace_id=workspace_id + ) + + return self.update_collector_by_vo({"state": "ENABLED"}, collector_vo) + + def disable_collector( + self, collector_id: str, domain_id: str, workspace_id: str = None + ): + collector_vo: Collector = self.collector_model.get( + collector_id=collector_id, domain_id=domain_id, workspace_id=workspace_id + ) + + return self.update_collector_by_vo({"state": "DISABLED"}, collector_vo) + + @staticmethod + def delete_collector_by_vo(collector_vo: Collector) -> None: + collector_vo.delete() + + def get_collector( + self, + collector_id: str, + domain_id: str, + workspace_id: Union[list, str, None] = None, + ) -> Collector: + conditions = { + "collector_id": collector_id, + "domain_id": domain_id, + } + + if workspace_id: + conditions.update({"workspace_id": workspace_id}) + + return self.collector_model.get(**conditions) + + def filter_collector(self, **conditions) -> QuerySet: + return self.collector_model.filter(**conditions) + + def list_collectors(self, query: dict) -> Tuple[QuerySet, int]: + return self.collector_model.query(**query) + + def stat_collectors(self, query: dict) -> dict: + return self.collector_model.stat(**query) + + def update_last_collected_time(self, collector_vo: Collector): + _LOGGER.debug( + f"[update_last_collected_time] updated collected at: {collector_vo.collector_id}" + ) + self.update_collector_by_vo( + {"last_collected_at": datetime.utcnow()}, collector_vo + ) diff --git a/src/spaceone/inventory_v2/manager/collector_plugin_manager.py b/src/spaceone/inventory_v2/manager/collector_plugin_manager.py new file mode 100644 index 0000000..9a10b81 --- /dev/null +++ b/src/spaceone/inventory_v2/manager/collector_plugin_manager.py @@ -0,0 +1,49 @@ +import logging +from typing import Generator, Union +from spaceone.core.manager import BaseManager +from spaceone.core.connector.space_connector import SpaceConnector + +__ALL__ = ["CollectorPluginManager"] + +_LOGGER = logging.getLogger(__name__) + + +class CollectorPluginManager(BaseManager): + def init_plugin(self, endpoint: str, options: dict) -> dict: + plugin_connector: SpaceConnector = self.locator.get_connector( + SpaceConnector, endpoint=endpoint, token="NO_TOKEN" + ) + return plugin_connector.dispatch("Collector.init", {"options": options}) + + def verify_plugin(self, endpoint: str, options: dict, secret_data: dict) -> None: + plugin_connector: SpaceConnector = self.locator.get_connector( + SpaceConnector, endpoint=endpoint, token="NO_TOKEN" + ) + params = {"options": options, "secret_data": secret_data} + plugin_connector.dispatch("Collector.verify", params) + + def collect( + self, + endpoint: str, + options: dict, + secret_data: dict, + task_options: dict = None, + ) -> Generator[dict, None, None]: + plugin_connector: SpaceConnector = self.locator.get_connector( + SpaceConnector, endpoint=endpoint, token="NO_TOKEN" + ) + + params = {"options": options, "secret_data": secret_data, "filter": {}} + + if task_options: + params["task_options"] = task_options + + return plugin_connector.dispatch("Collector.collect", params) + + def get_tasks(self, endpoint: str, secret_data: dict, options: dict) -> dict: + plugin_connector: SpaceConnector = self.locator.get_connector( + SpaceConnector, endpoint=endpoint, token="NO_TOKEN" + ) + + params = {"options": options, "secret_data": secret_data} + return plugin_connector.dispatch("Job.get_tasks", params) diff --git a/src/spaceone/inventory_v2/manager/collector_rule_manager.py b/src/spaceone/inventory_v2/manager/collector_rule_manager.py new file mode 100644 index 0000000..19a1e22 --- /dev/null +++ b/src/spaceone/inventory_v2/manager/collector_rule_manager.py @@ -0,0 +1,318 @@ +import logging +import functools +from typing import Tuple +from spaceone.core import utils +from spaceone.core.model.mongo_model import QuerySet +from spaceone.core.manager import BaseManager +from spaceone.inventory_v2.manager.identity_manager import IdentityManager +from spaceone.inventory_v2.model.collector_rule.database import ( + CollectorRule, + CollectorRuleCondition, +) + +_LOGGER = logging.getLogger(__name__) + + +class CollectorRuleManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.collector_rule_model = CollectorRule + self.identity_mgr = IdentityManager() + self._project_info = {} + self._service_account_info = {} + self._collector_rule_info = {} + + def create_collector_rule(self, params: dict) -> CollectorRule: + def _rollback(vo: CollectorRule): + _LOGGER.info( + f"[create_collector_rule._rollback] Delete event rule : {vo.name} " + f"({vo.collector_rule_id})" + ) + vo.delete() + + collector_rule_vo: CollectorRule = self.collector_rule_model.create(params) + self.transaction.add_rollback(_rollback, collector_rule_vo) + + return collector_rule_vo + + def update_collector_rule_by_vo( + self, params: dict, collector_rule_vo: CollectorRule + ) -> CollectorRule: + def _rollback(old_data): + _LOGGER.info( + f"[update_collector_rule_by_vo._rollback] Revert Data : " + f'{old_data["collector_rule_id"]}' + ) + collector_rule_vo.update(old_data) + + self.transaction.add_rollback(_rollback, collector_rule_vo.to_dict()) + + return collector_rule_vo.update(params) + + @staticmethod + def delete_collector_rule_by_vo(collector_rule_vo: CollectorRule) -> None: + collector_rule_vo.delete() + + def get_collector_rule( + self, collector_rule_id: str, domain_id: str, workspace_id: str = None + ) -> CollectorRule: + conditions = { + "collector_rule_id": collector_rule_id, + "domain_id": domain_id, + } + + if workspace_id: + conditions["workspace_id"] = workspace_id + + return self.collector_rule_model.get(**conditions) + + def filter_collector_rules(self, **conditions) -> QuerySet: + return self.collector_rule_model.filter(**conditions) + + def list_collector_rules(self, query: dict) -> Tuple[QuerySet, int]: + return self.collector_rule_model.query(**query) + + def stat_collector_rules(self, query: dict) -> dict: + return self.collector_rule_model.stat(**query) + + def change_cloud_service_data( + self, collector_id: str, domain_id: str, cloud_service_data: dict + ) -> dict: + ( + managed_collector_rule_vos, + custom_collector_rule_vos, + ) = self._get_collector_rules(collector_id, domain_id) + + cloud_service_data = self._apply_collector_rule_to_cloud_service_data( + cloud_service_data, managed_collector_rule_vos, domain_id + ) + + cloud_service_data = self._apply_collector_rule_to_cloud_service_data( + cloud_service_data, custom_collector_rule_vos, domain_id + ) + + return cloud_service_data + + def _apply_collector_rule_to_cloud_service_data( + self, cloud_service_data: dict, collector_rule_vos: QuerySet, domain_id: str + ) -> dict: + for collector_rule_vo in collector_rule_vos: + is_match = self._change_cloud_service_data_by_rule( + cloud_service_data, collector_rule_vo + ) + + if is_match: + cloud_service_data = self._change_cloud_service_data_with_actions( + cloud_service_data, collector_rule_vo.actions, domain_id + ) + + if is_match and collector_rule_vo.options.stop_processing: + break + + return cloud_service_data + + def _change_cloud_service_data_with_actions( + self, cloud_service_data: dict, actions: dict, domain_id: str + ) -> dict: + for action, value in actions.items(): + if action == "change_project": + project_info = self._get_project("project_id", value, domain_id) + + if project_info: + cloud_service_data["project_id"] = project_info["project_id"] + cloud_service_data["workspace_id"] = project_info["workspace_id"] + + elif action == "match_project": + source = value["source"] + target_key = value.get("target", "project_id") + target_value = utils.get_dict_value(cloud_service_data, source) + + if target_value: + project_info = self._get_project( + target_key, target_value, domain_id + ) + + if project_info: + cloud_service_data["project_id"] = project_info["project_id"] + cloud_service_data["workspace_id"] = project_info[ + "workspace_id" + ] + + elif action == "match_service_account": + source = value["source"] + target_key = value.get("target", "service_account_id") + target_value = utils.get_dict_value(cloud_service_data, source) + if target_value: + service_account_info = self._get_service_account( + target_key, target_value, domain_id + ) + if service_account_info: + cloud_service_data["service_account_id"] = service_account_info[ + "service_account_id" + ] + cloud_service_data["project_id"] = service_account_info[ + "project_id" + ] + cloud_service_data["workspace_id"] = service_account_info[ + "workspace_id" + ] + + return cloud_service_data + + def _get_service_account( + self, target_key: str, target_value: any, domain_id: str + ) -> dict: + if ( + f"inventory:service-account:{domain_id}:{target_key}:{target_value}" + in self._service_account_info + ): + return self._service_account_info[ + f"inventory:service-account:{domain_id}:{target_key}:{target_value}" + ] + + query = { + "filter": [ + {"k": target_key, "v": target_value, "o": "eq"}, + ], + "only": ["service_account_id", "project_id", "workspace_id"], + } + + query_hash = utils.dict_to_hash(query) + response = self.identity_mgr.list_service_accounts_with_cache( + query, query_hash, domain_id + ) + results = response.get("results", []) + total_count = response.get("total_count", 0) + + service_account_info = None + if total_count > 0: + service_account_info = results[0] + + self._service_account_info[ + f"inventory:service-account:{domain_id}:{target_key}:{target_value}" + ] = service_account_info + return service_account_info + + def _get_project(self, target_key: str, target_value: str, domain_id: str) -> dict: + if ( + f"identity:project:{domain_id}:{target_key}:{target_value}" + in self._project_info + ): + return self._project_info[ + f"identity:project:{domain_id}:{target_key}:{target_value}" + ] + + query = { + "filter": [{"k": target_key, "v": target_value, "o": "eq"}], + "only": ["project_id", "workspace_id"], + } + + query_hash = utils.dict_to_hash(query) + response = self.identity_mgr.list_projects_with_cache( + query, query_hash, domain_id + ) + results = response.get("results", []) + total_count = response.get("total_count", 0) + + project_info = None + if total_count > 0: + project_info = results[0] + + self._project_info[ + f"identity:project:{domain_id}:{target_key}:{target_value}" + ] = project_info + return project_info + + def _change_cloud_service_data_by_rule( + self, cloud_service_data: dict, collector_rule_vo: CollectorRule + ) -> bool: + conditions_policy = collector_rule_vo.conditions_policy + + if conditions_policy == "ALWAYS": + return True + else: + results = list( + map( + functools.partial(self._check_condition, cloud_service_data), + collector_rule_vo.conditions, + ) + ) + + if conditions_policy == "ALL": + return all(results) + else: + return any(results) + + @staticmethod + def _check_condition( + cloud_service_data: dict, condition: CollectorRuleCondition + ) -> bool: + cloud_service_value = utils.get_dict_value(cloud_service_data, condition.key) + condition_value = condition.value + operator = condition.operator + + if cloud_service_value is None: + return False + + if operator == "eq": + if cloud_service_value == condition_value: + return True + else: + return False + elif operator == "contain": + if cloud_service_value.lower().find(condition_value.lower()) >= 0: + return True + else: + return False + elif operator == "not": + if cloud_service_value != condition_value: + return True + else: + return False + elif operator == "not_contain": + if cloud_service_value.lower().find(condition_value.lower()) < 0: + return True + else: + return False + + return False + + def _get_collector_rules( + self, collector_id: str, domain_id: str + ) -> Tuple[QuerySet, QuerySet]: + if collector_id in self._collector_rule_info: + return self._collector_rule_info[collector_id].get( + "managed", [] + ), self._collector_rule_info[collector_id].get("custom", []) + + managed_query = self._make_collector_rule_query( + collector_id, "MANAGED", domain_id + ) + + managed_collector_rule_vos, total_count = self.list_collector_rules( + managed_query + ) + + custom_query = self._make_collector_rule_query( + collector_id, "CUSTOM", domain_id + ) + custom_collector_rule_vos, total_count = self.list_collector_rules(custom_query) + + self._collector_rule_info[collector_id] = {} + self._collector_rule_info[collector_id]["managed"] = managed_collector_rule_vos + self._collector_rule_info[collector_id]["custom"] = custom_collector_rule_vos + + return managed_collector_rule_vos, custom_collector_rule_vos + + @staticmethod + def _make_collector_rule_query( + collector_id: str, rule_type: str, domain_id: str + ) -> dict: + return { + "filter": [ + {"k": "collector_id", "v": collector_id, "o": "eq"}, + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "rule_type", "v": rule_type, "o": "eq"}, + ], + "sort": [{"key": "order"}], + } diff --git a/src/spaceone/inventory_v2/manager/identity_manager.py b/src/spaceone/inventory_v2/manager/identity_manager.py new file mode 100644 index 0000000..4f1c7f1 --- /dev/null +++ b/src/spaceone/inventory_v2/manager/identity_manager.py @@ -0,0 +1,181 @@ +import logging +from typing import Union + +from spaceone.core import cache +from spaceone.core import config +from spaceone.core.manager import BaseManager +from spaceone.core.connector.space_connector import SpaceConnector +from spaceone.core.auth.jwt.jwt_util import JWTUtil + +_LOGGER = logging.getLogger(__name__) + + +class IdentityManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + token = self.transaction.get_meta("token") or kwargs.get("token") + self.token_type = JWTUtil.get_value_from_token(token, "typ") + self.identity_conn: SpaceConnector = self.locator.get_connector( + SpaceConnector, + service="identity", + token=token, + ) + + def get_user(self, domain_id: str, user_id: str) -> dict: + system_token = config.get_global("TOKEN") + response = self.identity_conn.dispatch( + "User.list", + {"user_id": user_id, "state": "ENABLED"}, + x_domain_id=domain_id, + token=system_token, + ) + users_info = response.get("results", []) + if users_info: + return users_info[0] + else: + return {} + + def get_domain_name(self, domain_id: str) -> str: + system_token = config.get_global("TOKEN") + + domain_info = self.identity_conn.dispatch( + "Domain.get", {"domain_id": domain_id}, token=system_token + ) + return domain_info["name"] + + def list_domains(self, params: dict) -> dict: + system_token = config.get_global("TOKEN") + return self.identity_conn.dispatch("Domain.list", params, token=system_token) + + def list_enabled_domain_ids(self) -> list: + system_token = config.get_global("TOKEN") + params = { + "query": { + "filter": [ + {"k": "state", "v": "ENABLED", "o": "eq"}, + ] + } + } + response = self.identity_conn.dispatch( + "Domain.list", + params, + token=system_token, + ) + domains_info = response.get("results", []) + domain_ids = [domain["domain_id"] for domain in domains_info] + return domain_ids + + def check_workspace(self, workspace_id: str, domain_id: str) -> None: + system_token = config.get_global("TOKEN") + + self.identity_conn.dispatch( + "Workspace.check", + {"workspace_id": workspace_id, "domain_id": domain_id}, + token=system_token, + ) + + def list_workspaces(self, params: dict, domain_id: str, token: str = None) -> dict: + if self.token_type == "SYSTEM_TOKEN" or token: + return self.identity_conn.dispatch( + "Workspace.list", params, x_domain_id=domain_id, token=token + ) + else: + return self.identity_conn.dispatch("Workspace.list", params) + + def list_workspace_users(self, params: dict, domain_id: str) -> dict: + if self.token_type == "SYSTEM_TOKEN": + return self.identity_conn.dispatch( + "WorkspaceUser.list", params, x_domain_id=domain_id + ) + else: + return self.identity_conn.dispatch("WorkspaceUser.list", params) + + def get_service_account_name_map(self, domain_id: str, workspace_id: str) -> dict: + service_account_name_map = {} + service_accounts = self.list_service_accounts( + { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "workspace_id", "v": workspace_id, "o": "eq"}, + ] + }, + domain_id, + ) + for service_account in service_accounts.get("results", []): + service_account_name_map[service_account["service_account_id"]] = ( + service_account["name"] + ) + return service_account_name_map + + def list_service_accounts(self, query: dict, domain_id: str) -> dict: + if self.token_type == "SYSTEM_TOKEN": + return self.identity_conn.dispatch( + "ServiceAccount.list", {"query": query}, x_domain_id=domain_id + ) + else: + return self.identity_conn.dispatch("ServiceAccount.list", {"query": query}) + + def get_project(self, project_id: str, domain_id: str): + if self.token_type == "SYSTEM_TOKEN": + return self.identity_conn.dispatch( + "Project.get", {"project_id": project_id}, x_domain_id=domain_id + ) + else: + return self.identity_conn.dispatch( + "Project.get", {"project_id": project_id} + ) + + def get_project_name_map(self, domain_id: str, workspace_id: str) -> dict: + project_name_map = {} + params = { + "query": { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "workspace_id", "v": workspace_id, "o": "eq"}, + ] + } + } + + response = self.list_projects( + params=params, + domain_id=domain_id, + ) + for project in response.get("results", []): + project_name_map[project["project_id"]] = project["name"] + return project_name_map + + def list_projects(self, params: dict, domain_id: str): + if self.token_type == "SYSTEM_TOKEN": + return self.identity_conn.dispatch( + "Project.list", params, x_domain_id=domain_id + ) + else: + return self.identity_conn.dispatch("Project.list", params) + + def list_project_groups(self, params: dict, domain_id: str) -> dict: + if self.token_type == "SYSTEM_TOKEN": + return self.identity_conn.dispatch( + "ProjectGroup.list", params, x_domain_id=domain_id + ) + else: + return self.identity_conn.dispatch("ProjectGroup.list", params) + + @cache.cacheable( + key="inventory:project:query:{domain_id}:{query_hash}", expire=3600 + ) + def list_projects_with_cache( + self, query: dict, query_hash: str, domain_id: str + ) -> dict: + return self.list_projects({"query": query}, domain_id) + + @cache.cacheable( + key="inventory:service-account:query:{domain_id}:{query_hash}", expire=3600 + ) + def list_service_accounts_with_cache( + self, query: dict, query_hash: str, domain_id: str + ) -> dict: + return self.list_service_accounts(query, domain_id) + + def list_schemas(self, query: dict, domain_id: str) -> dict: + # For general user, use access token + return self.identity_conn.dispatch("Schema.list", {"query": query}) diff --git a/src/spaceone/inventory_v2/manager/job_manager.py b/src/spaceone/inventory_v2/manager/job_manager.py new file mode 100644 index 0000000..7c0140e --- /dev/null +++ b/src/spaceone/inventory_v2/manager/job_manager.py @@ -0,0 +1,246 @@ +import logging +from typing import Tuple, List +from datetime import datetime, timedelta +from spaceone.core import cache, config +from spaceone.core.manager import BaseManager +from spaceone.core.model.mongo_model import QuerySet +from spaceone.inventory_v2.error import * +from spaceone.inventory_v2.model.collector.database import Collector +from spaceone.inventory_v2.model.job.database import Job + +# from spaceone.inventory_v2.model.job_task_model import JobTask +# from spaceone.inventory_v2.manager.metric_manager import MetricManager +# from spaceone.inventory_v2.manager.metric_data_manager import MetricDataManager + +_LOGGER = logging.getLogger(__name__) + + +class JobManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.job_model = Job + + def create_job(self, collector_vo: Collector, params: dict) -> Job: + """Create Job for collect method + Args: + collector_vo: collector model + params(dict): { + 'secret_id': str, + 'workspace_id': str, + } + Returns: job_vo + """ + + request_workspace_id = params.get("workspace_id") + changed_request_workspace_id = None + if isinstance(request_workspace_id, list): + for workspace_id in request_workspace_id: + if workspace_id != "*": + changed_request_workspace_id = workspace_id + else: + changed_request_workspace_id = request_workspace_id + + job_params = params.copy() + job_params["request_secret_id"] = params.get("secret_id") + job_params["request_workspace_id"] = changed_request_workspace_id + job_params["collector"] = collector_vo + job_params["collector_id"] = collector_vo.collector_id + job_params["resource_group"] = collector_vo.resource_group + job_params["workspace_id"] = collector_vo.workspace_id + job_params["domain_id"] = collector_vo.domain_id + + return self.job_model.create(job_params) + + @staticmethod + def update_job_by_vo(params: dict, job_vo: Job) -> Job: + return job_vo.update(params) + + @staticmethod + def delete_job_by_vo(job_vo: Job) -> None: + job_vo.delete() + + def get_job(self, job_id: str, domain_id: str, workspace_id: str = None) -> Job: + conditions = { + "job_id": job_id, + "domain_id": domain_id, + } + + if workspace_id: + conditions.update({"workspace_id": workspace_id}) + + return self.job_model.get(**conditions) + + def filter_jobs(self, **conditions) -> QuerySet: + return self.job_model.filter(**conditions) + + def list_jobs(self, query: dict) -> Tuple[QuerySet, int]: + return self.job_model.query(**query) + + def analyze_jobs(self, query: dict) -> dict: + return self.job_model.analyze(**query) + + def stat_jobs(self, query: dict) -> dict: + return self.job_model.stat(**query) + + def increase_success_tasks(self, job_id: str, domain_id: str) -> None: + job_vo: Job = self.get_job(job_id, domain_id) + job_vo.increment("success_tasks") + self.decrease_remained_tasks_by_vo(job_vo) + + def increase_failure_tasks(self, job_id: str, domain_id: str) -> None: + job_vo: Job = self.get_job(job_id, domain_id) + job_vo.increment("failure_tasks") + self.decrease_remained_tasks_by_vo(job_vo) + + def decrease_remained_tasks_by_vo(self, job_vo: Job) -> None: + job_vo = job_vo.decrement("remained_tasks") + + if job_vo.remained_tasks == 0: + if job_vo.status == "IN_PROGRESS": + if job_vo.failure_tasks > 0: + self.make_failure_by_vo(job_vo) + else: + self.make_success_by_vo(job_vo) + + if self._is_changed(job_vo): + self._run_metric_queries(job_vo.plugin_id, job_vo.domain_id) + + def _is_changed(self, job_vo: Job) -> bool: + job_task_model: JobTask = self.locator.get_model("JobTask") + job_task_vos: List[JobTask] = job_task_model.filter( + job_id=job_vo.job_id, domain_id=job_vo.domain_id + ) + is_changed = False + + for job_task_vo in job_task_vos: + if ( + job_task_vo.created_count > 0 + or job_task_vo.updated_count > 0 + or job_task_vo.deleted_count > 0 + ): + is_changed = True + break + + _LOGGER.debug( + f"[_is_changed] job_id: {job_vo.job_id}, is_changed: {is_changed}" + ) + return is_changed + + def _run_metric_queries(self, plugin_id: str, domain_id: str) -> None: + metric_mgr = MetricManager() + recent_metrics = self._get_recent_metrics(domain_id) + + managed_metric_vos = metric_mgr.filter_metrics( + is_managed=True, domain_id=domain_id, plugin_id=None + ) + for managed_metric_vo in managed_metric_vos: + if managed_metric_vo.is_new or ( + managed_metric_vo.metric_id in recent_metrics + ): + metric_mgr.push_task(managed_metric_vo) + + plugin_metric_vos = metric_mgr.filter_metrics( + is_managed=True, plugin_id=plugin_id, domain_id=domain_id + ) + for plugin_metric_vo in plugin_metric_vos: + if plugin_metric_vo.is_new or ( + plugin_metric_vo.metric_id in recent_metrics + ): + metric_mgr.push_task(plugin_metric_vo) + + @staticmethod + def _get_recent_metrics(domain_id: str) -> List[str]: + metric_data_mgr = MetricDataManager() + metric_cache_ttl = config.get_global("METRIC_QUERY_TTL", 3) + ttl_time = datetime.utcnow() - timedelta(days=metric_cache_ttl) + + query = { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "updated_at", "v": ttl_time, "o": "gte"}, + ] + } + + _LOGGER.debug( + f"[_get_metric_query_history] metric_cache_ttl: {metric_cache_ttl} days" + ) + + history_vos, total_count = metric_data_mgr.list_metric_query_history(query) + recent_metrics = [] + for history_vo in history_vos: + recent_metrics.append(history_vo.metric_id) + + _LOGGER.debug( + f"[_get_metric_query_history] recent_metrics({domain_id}): {recent_metrics}" + ) + + return recent_metrics + + def update_job_timeout_by_hour(self, job_timeout: int, domain_id: str) -> None: + created_at = datetime.utcnow() - timedelta(hours=job_timeout) + query = { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "created_at", "v": created_at, "o": "lt"}, + {"k": "status", "v": "IN_PROGRESS", "o": "eq"}, + ] + } + + job_vos, total_count = self.list_jobs(query) + for job_vo in job_vos: + self.make_failure_by_vo(job_vo) + + def get_duplicate_jobs( + self, + collector_id: str, + domain_id: str, + request_workspace_id: str = None, + request_secret_id: str = None, + ) -> QuerySet: + changed_request_workspace_id = None + if isinstance(request_workspace_id, list): + for workspace_id in request_workspace_id: + if workspace_id != "*": + changed_request_workspace_id = workspace_id + else: + changed_request_workspace_id = request_workspace_id + + query = { + "filter": [ + {"k": "domain_id", "v": domain_id, "o": "eq"}, + {"k": "collector_id", "v": collector_id, "o": "eq"}, + {"k": "status", "v": "IN_PROGRESS", "o": "eq"}, + { + "k": "request_workspace_id", + "v": changed_request_workspace_id, + "o": "eq", + }, + {"k": "request_secret_id", "v": request_secret_id, "o": "eq"}, + ] + } + + job_vos, total_count = self.list_jobs(query) + return job_vos + + def make_success_by_vo(self, job_vo: Job) -> None: + self._update_job_status_by_vo(job_vo, "SUCCESS") + + def make_failure_by_vo(self, job_vo: Job) -> None: + self._update_job_status_by_vo(job_vo, "FAILURE") + + def make_canceled_by_vo(self, job_vo: Job) -> None: + _LOGGER.debug(f"[make_canceled_by_vo] cancel job: {job_vo.job_id}") + self._update_job_status_by_vo(job_vo, "CANCELED") + + def check_cancel(self, job_id: str, domain_id: str) -> bool: + job_vo: Job = self.get_job(job_id, domain_id) + return job_vo.status == "CANCELED" + + @staticmethod + def _update_job_status_by_vo(job_vo: Job, status: str) -> Job: + params = {"status": status} + if status in ["SUCCESS", "FAILURE", "CANCELED"]: + params.update({"finished_at": datetime.utcnow()}) + + _LOGGER.debug(f"[update_job_status] job_id: {job_vo.job_id}, status: {status}") + return job_vo.update(params) diff --git a/src/spaceone/inventory_v2/manager/job_task_manager.py b/src/spaceone/inventory_v2/manager/job_task_manager.py new file mode 100644 index 0000000..9c16478 --- /dev/null +++ b/src/spaceone/inventory_v2/manager/job_task_manager.py @@ -0,0 +1,224 @@ +import copy +import logging +import json +from typing import Tuple, Union +from jsonschema import validate +from datetime import datetime +from spaceone.core import config, queue, utils +from spaceone.core.manager import BaseManager +from spaceone.core.scheduler.task_schema import SPACEONE_TASK_SCHEMA +from spaceone.core.model.mongo_model import QuerySet + +from spaceone.inventory_v2.manager.job_manager import JobManager +# from spaceone.inventory.manager.cleanup_manager import CleanupManager +from spaceone.inventory_v2.model.job_task.database import JobTask + +_LOGGER = logging.getLogger(__name__) + + +class JobTaskManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.job_task_model = JobTask + + def create_job_task(self, params: dict) -> JobTask: + def _rollback(vo: JobTask): + _LOGGER.info(f"[ROLLBACK] Delete job task: {vo.job_task_id}") + vo.delete() + + job_task_vo: JobTask = self.job_task_model.create(params) + self.transaction.add_rollback(_rollback, job_task_vo) + return job_task_vo + + def get( + self, + job_task_id: str, + domain_id: str, + workspace_id: str = None, + user_projects: list = None, + ) -> JobTask: + conditions = { + "job_task_id": job_task_id, + "domain_id": domain_id, + } + + if workspace_id: + conditions["workspace_id"] = workspace_id + + if user_projects: + conditions["project_id"] = user_projects + + return self.job_task_model.get(**conditions) + + def filter_job_tasks(self, **conditions) -> QuerySet: + return self.job_task_model.filter(**conditions) + + def list(self, query: dict) -> Tuple[QuerySet, int]: + return self.job_task_model.query(**query) + + def stat(self, query: dict) -> dict: + return self.job_task_model.stat(**query) + + def push_job_task(self, params: dict) -> None: + task = self.create_task_pipeline(copy.deepcopy(params)) + validate(task, schema=SPACEONE_TASK_SCHEMA) + json_task = json.dumps(task) + queue.put(self.get_queue_name(name="collect_queue"), json_task) + + @staticmethod + def add_error( + job_task_vo: JobTask, + error_code: str, + error_message: str, + additional: dict = None, + ) -> None: + error_info = {"error_code": error_code, "message": str(error_message).strip()} + + if additional: + error_info["additional"] = additional + + job_task_vo.append("errors", error_info) + _LOGGER.error( + f"[add_error] {job_task_vo.job_task_id}: {error_info}", exc_info=True + ) + + @staticmethod + def _update_job_status_by_vo( + job_task_vo: JobTask, + status: str, + started_at: datetime = None, + finished_at: datetime = None, + ) -> None: + params = {"status": status} + + if started_at: + params["started_at"] = started_at + + if finished_at: + params["finished_at"] = finished_at + + _LOGGER.debug( + f"[update_job_status] collector_id: {job_task_vo.collector_id}, " + f"job_task_id: {job_task_vo.job_task_id}, status: {status}" + ) + job_task_vo.update(params) + + def make_inprogress_by_vo( + self, + job_task_vo: JobTask, + ) -> None: + if job_task_vo.status == "PENDING": + self._update_job_status_by_vo( + job_task_vo, + "IN_PROGRESS", + started_at=datetime.utcnow(), + ) + + def make_success_by_vo( + self, + job_task_vo: JobTask, + ) -> None: + self._update_job_status_by_vo( + job_task_vo, + "SUCCESS", + finished_at=datetime.utcnow(), + ) + + def make_failure_by_vo( + self, + job_task_vo: JobTask, + collecting_count_info: dict = None, + ) -> None: + self._update_job_status_by_vo( + job_task_vo, + "FAILURE", + finished_at=datetime.utcnow(), + ) + + self.decrease_remained_sub_tasks(job_task_vo, collecting_count_info) + + def decrease_remained_sub_tasks( + self, job_task_vo: JobTask, collecting_count_info: dict = None + ) -> JobTask: + if collecting_count_info: + self._update_collecting_count_info(job_task_vo, collecting_count_info) + + job_task_vo: JobTask = job_task_vo.decrement("remained_sub_tasks") + if job_task_vo.remained_sub_tasks == 0: + job_mgr: JobManager = self.locator.get_manager(JobManager) + if job_task_vo.status == "IN_PROGRESS": + deleted_resources_info = self._update_disconnected_and_deleted_count( + job_task_vo + ) + self._update_collecting_count_info(job_task_vo, deleted_resources_info) + + self.make_success_by_vo(job_task_vo) + job_mgr.increase_success_tasks( + job_task_vo.job_id, job_task_vo.domain_id + ) + else: + job_mgr.increase_failure_tasks( + job_task_vo.job_id, job_task_vo.domain_id + ) + + return job_task_vo + + @staticmethod + def _update_collecting_count_info( + job_task_vo: JobTask, collecting_count_info: dict + ) -> None: + _LOGGER.debug( + f"[_update_collecting_count_info] update collecting count => {utils.dump_json(collecting_count_info)}" + ) + for key, value in collecting_count_info.items(): + if isinstance(value, int) and value > 0: + job_task_vo.increment(key, value) + + # def _update_disconnected_and_deleted_count(self, job_task_vo: JobTask) -> dict: + # try: + # cleanup_mgr: CleanupManager = self.locator.get_manager(CleanupManager) + # return cleanup_mgr.update_disconnected_and_deleted_count( + # job_task_vo.collector_id, + # job_task_vo.secret_id, + # job_task_vo.job_task_id, + # job_task_vo.domain_id, + # ) + # except Exception as e: + # _LOGGER.error(f"[_update_collection_state] failed: {e}") + # return { + # "disconnected_count": 0, + # "deleted_count": 0, + # } + + @staticmethod + def delete_job_task_by_vo(job_task_vo: JobTask) -> None: + job_task_vo.delete() + + @staticmethod + def get_queue_name(name: str = "collect_queue") -> Union[str, None]: + try: + return config.get_global(name) + except Exception as e: + _LOGGER.warning(f"[_get_queue_name] name: {name} is not configured.") + return None + + def create_task_pipeline(self, params: dict) -> dict: + token = self.transaction.meta.get("token") + params["token"] = token + + task = { + "locator": "MANAGER", + "name": "CollectingManager", + "metadata": {}, + "method": "collecting_resources", + "params": {"params": params}, + } + + stp = { + "name": "collecting_resources", + "version": "v1", + "executionEngine": "BaseWorker", + "stages": [task], + } + # _LOGGER.debug(f'[_create_task] tasks: {stp}') + return stp diff --git a/src/spaceone/inventory_v2/manager/plugin_manager.py b/src/spaceone/inventory_v2/manager/plugin_manager.py new file mode 100644 index 0000000..878c537 --- /dev/null +++ b/src/spaceone/inventory_v2/manager/plugin_manager.py @@ -0,0 +1,40 @@ +import logging +from typing import Tuple + +from spaceone.core import config +from spaceone.core.manager import BaseManager +from spaceone.core.connector.space_connector import SpaceConnector + +__ALL__ = ["PluginManager"] + +_LOGGER = logging.getLogger(__name__) + + +class PluginManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.plugin_connector: SpaceConnector = self.locator.get_connector( + SpaceConnector, service="plugin" + ) + + def get_endpoint( + self, + plugin_id: str, + domain_id: str, + upgrade_mode: str = "AUTO", + version: str = None, + ) -> Tuple[str, str]: + system_token = config.get_global("TOKEN") + + response = self.plugin_connector.dispatch( + "Plugin.get_plugin_endpoint", + { + "plugin_id": plugin_id, + "domain_id": domain_id, + "upgrade_mode": upgrade_mode, + "version": version, + }, + token=system_token, + ) + + return response.get("endpoint"), response.get("updated_version") diff --git a/src/spaceone/inventory_v2/manager/repository_manager.py b/src/spaceone/inventory_v2/manager/repository_manager.py new file mode 100644 index 0000000..394b0dc --- /dev/null +++ b/src/spaceone/inventory_v2/manager/repository_manager.py @@ -0,0 +1,17 @@ +import logging + +from spaceone.core.manager import BaseManager +from spaceone.core.connector.space_connector import SpaceConnector + +_LOGGER = logging.getLogger(__name__) + + +class RepositoryManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.repo_connector: SpaceConnector = self.locator.get_connector( + "SpaceConnector", service="repository" + ) + + def get_plugin(self, plugin_id: str) -> dict: + return self.repo_connector.dispatch("Plugin.get", {"plugin_id": plugin_id}) diff --git a/src/spaceone/inventory_v2/manager/secret_manager.py b/src/spaceone/inventory_v2/manager/secret_manager.py new file mode 100644 index 0000000..b6de5ab --- /dev/null +++ b/src/spaceone/inventory_v2/manager/secret_manager.py @@ -0,0 +1,50 @@ +import logging + +from spaceone.core import config +from spaceone.core.manager import BaseManager +from spaceone.core.connector.space_connector import SpaceConnector +from spaceone.core.auth.jwt.jwt_util import JWTUtil + +_LOGGER = logging.getLogger(__name__) + + +class SecretManager(BaseManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + token = self.transaction.get_meta("token") + self.token_type = JWTUtil.get_value_from_token(token, "typ") + + self.secret_connector: SpaceConnector = self.locator.get_connector( + SpaceConnector, service="secret" + ) + + def get_secret(self, secret_id: str, domain_id: str) -> dict: + if self.token_type == "SYSTEM_TOKEN": + return self.secret_connector.dispatch( + "Secret.get", + {"secret_id": secret_id}, + x_domain_id=domain_id, + ) + else: + return self.secret_connector.dispatch( + "Secret.get", {"secret_id": secret_id} + ) + + def list_secrets(self, query: dict, domain_id: str) -> dict: + if self.token_type == "SYSTEM_TOKEN": + return self.secret_connector.dispatch( + "Secret.list", + {"query": query}, + x_domain_id=domain_id, + ) + else: + return self.secret_connector.dispatch("Secret.list", {"query": query}) + + def get_secret_data(self, secret_id: str, domain_id: str) -> dict: + system_token = config.get_global("TOKEN") + + return self.secret_connector.dispatch( + "Secret.get_data", + {"secret_id": secret_id, "domain_id": domain_id}, + token=system_token, + ) diff --git a/src/spaceone/inventory_v2/model/__init__.py b/src/spaceone/inventory_v2/model/__init__.py index 3eb57ed..43c4165 100644 --- a/src/spaceone/inventory_v2/model/__init__.py +++ b/src/spaceone/inventory_v2/model/__init__.py @@ -1 +1,6 @@ -from spaceone.inventory_v2.model.region_model import Region \ No newline at end of file +from spaceone.inventory_v2.model.region_model import Region +from spaceone.inventory_v2.model.collector.database import Collector +from spaceone.inventory_v2.model.collector_rule.database import CollectorRule +from spaceone.inventory_v2.model.collection_state.database import CollectionState +from spaceone.inventory_v2.model.job.database import Job +from spaceone.inventory_v2.model.job_task.database import JobTask diff --git a/src/spaceone/inventory_v2/model/collection_state/__init__.py b/src/spaceone/inventory_v2/model/collection_state/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spaceone/inventory_v2/model/collection_state/database.py b/src/spaceone/inventory_v2/model/collection_state/database.py new file mode 100644 index 0000000..7c2a218 --- /dev/null +++ b/src/spaceone/inventory_v2/model/collection_state/database.py @@ -0,0 +1,46 @@ +from mongoengine import * +from spaceone.core.model.mongo_model import MongoModel + + +class CollectionState(MongoModel): + collector_id = StringField(max_length=40) + job_task_id = StringField(max_length=40) + secret_id = StringField(max_length=40) + cloud_service_id = StringField(max_length=40) + disconnected_count = IntField(default=0) + domain_id = StringField(max_length=40) + updated_at = DateTimeField(auto_now=True) + + meta = { + "updatable_fields": ["job_task_id", "disconnected_count", "updated_at"], + "indexes": [ + { + "fields": [ + "domain_id", + "cloud_service_id", + "collector_id", + "secret_id", + ], + "name": "COMPOUND_INDEX_FOR_GET", + }, + { + "fields": ["domain_id", "collector_id", "-disconnected_count"], + "name": "COMPOUND_INDEX_FOR_DELETE_1", + }, + { + "fields": ["domain_id", "cloud_service_id"], + "name": "COMPOUND_INDEX_FOR_DELETE_2", + }, + { + "fields": [ + "domain_id", + "collector_id", + "job_task_id", + "secret_id", + "updated_at", + ], + "name": "COMPOUND_INDEX_FOR_DELETE_3", + }, + "cloud_service_id", + ], + } diff --git a/src/spaceone/inventory_v2/model/collector/__init__.py b/src/spaceone/inventory_v2/model/collector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spaceone/inventory_v2/model/collector/database.py b/src/spaceone/inventory_v2/model/collector/database.py new file mode 100644 index 0000000..0e085dc --- /dev/null +++ b/src/spaceone/inventory_v2/model/collector/database.py @@ -0,0 +1,91 @@ +from mongoengine import * + +from spaceone.core.model.mongo_model import MongoModel + + +class PluginInfo(EmbeddedDocument): + plugin_id = StringField(max_length=255) + version = StringField(max_length=255) + options = DictField() + metadata = DictField() + upgrade_mode = StringField( + max_length=20, default="AUTO", choices=("AUTO", "MANUAL") + ) + + def to_dict(self): + return dict(self.to_mongo()) + + +class SecretFilter(EmbeddedDocument): + state = StringField( + max_length=20, default="DISABLED", choices=("ENABLED", "DISABLED") + ) + secrets = ListField(StringField(max_length=40), defualt=None, null=True) + service_accounts = ListField(StringField(max_length=40), default=None, null=True) + schemas = ListField(StringField(max_length=40), default=None, null=True) + exclude_secrets = ListField(StringField(max_length=40), defualt=None, null=True) + exclude_service_accounts = ListField( + StringField(max_length=40), default=None, null=True + ) + exclude_schemas = ListField(StringField(max_length=40), default=None, null=True) + + def to_dict(self): + return dict(self.to_mongo()) + + +class Scheduled(EmbeddedDocument): + state = StringField( + max_length=20, default="DISABLED", choices=("ENABLED", "DISABLED") + ) + hours = ListField(default=[]) + + def to_dict(self): + return dict(self.to_mongo()) + + +class Collector(MongoModel): + collector_id = StringField(max_length=40, generate_id="collector", unique=True) + name = StringField(max_length=255) + provider = StringField(max_length=40, default=None, null=True) + capability = DictField() + plugin_info = EmbeddedDocumentField(PluginInfo, default=None, null=True) + schedule = EmbeddedDocumentField(Scheduled, default=None, null=False) + secret_filter = EmbeddedDocumentField(SecretFilter, default=None, null=True) + tags = DictField() + resource_group = StringField(max_length=40, choices=("DOMAIN", "WORKSPACE")) + workspace_id = StringField(max_length=40) + domain_id = StringField(max_length=40) + created_at = DateTimeField(auto_now_add=True) + updated_at = DateTimeField(auto_now=True) + last_collected_at = DateTimeField() + + meta = { + "updatable_fields": [ + "name", + "plugin_info", + "schedule", + "secret_filter", + "tags", + "last_collected_at", + ], + "minimal_fields": [ + "collector_id", + "name", + "provider", + "capability", + "plugin_info", + "resource_group", + ], + "change_query_keys": { + "plugin_id": "plugin_info.plugin_id", + "secret_filter_state": "secret_filter.state", + "schedule_state": "schedule.state", + }, + "ordering": ["name"], + "indexes": [ + "provider", + "resource_group", + "workspace_id", + "domain_id", + ], + } diff --git a/src/spaceone/inventory_v2/model/collector/request.py b/src/spaceone/inventory_v2/model/collector/request.py new file mode 100644 index 0000000..89cefdb --- /dev/null +++ b/src/spaceone/inventory_v2/model/collector/request.py @@ -0,0 +1,88 @@ +from typing import Union, Literal, List +from pydantic import BaseModel + +__all__ = [ + "CollectorCreateRequest", + "CollectorUpdateRequest", + "CollectorUpdatePluginRequest", + "CollectorVerifyPluginRequest", + "CollectorDeleteRequest", + "CollectorGetRequest", + "CollectorSearchQueryRequest", + "CollectorStatQueryRequest", + "CollectorCollectRequest" +] + +ScheduleState = Literal["ENABLED", "DISABLED"] +ResourceGroup = Literal["DOMAIN", "WORKSPACE"] +UpgradeMode = Literal["AUTO", "MANUAL"] + + +class CollectorCreateRequest(BaseModel): + name: str + plugin_info: dict + schedule: dict + provider: Union[str, None] = None + secret_filter: Union[dict, None] = None + tags: Union[dict, None] = None + resource_group: ResourceGroup + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorUpdateRequest(BaseModel): + collector_id: str + name: Union[str, None] = None + schedule: Union[dict, None] = None + secret_filter: Union[dict, None] = None + tags: Union[dict, None] = None + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorUpdatePluginRequest(BaseModel): + collector_id: str + version: Union[str, None] = None + options: Union[dict, None] = None + upgrade_mode: Union[UpgradeMode, None] = None + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorVerifyPluginRequest(BaseModel): + collector_id: str + secret_id: str + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorDeleteRequest(BaseModel): + collector_id: str + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorGetRequest(BaseModel): + collector_id: str + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorSearchQueryRequest(BaseModel): + query: Union[dict, None] = None + collector_id: Union[str, None] = None + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorStatQueryRequest(BaseModel): + query: dict + workspace_id: Union[str, None] = None + domain_id: str + + +class CollectorCollectRequest(BaseModel): + collector_id: str + secret_id: str + workspace_id: Union[str, None] = None + domain_id: str diff --git a/src/spaceone/inventory_v2/model/collector/response.py b/src/spaceone/inventory_v2/model/collector/response.py new file mode 100644 index 0000000..4ce5b09 --- /dev/null +++ b/src/spaceone/inventory_v2/model/collector/response.py @@ -0,0 +1,41 @@ +from datetime import datetime +from typing import Union, Literal, List +from pydantic import BaseModel + +from spaceone.core import utils + +from spaceone.inventory_v2.model.collector.request import ResourceGroup + +__all__ = [ + "CollectorResponse", + "CollectorsResponse", +] + + +class CollectorResponse(BaseModel): + collector_id: Union[str, None] = None + name: Union[str, None] = None + provider: Union[str, None] = None + capability: Union[dict, None] = None + secret_filter: Union[dict, None] = None + plugin_info: Union[dict, None] = None + schedule: Union[dict, None] = None + tags: Union[dict, None] = None + resource_group: ResourceGroup + workspace_id: Union[str, None] = None + domain_id: Union[str, None] = None + created_at: Union[datetime, None] = None + updated_at: Union[datetime, None] = None + last_collected_at: Union[datetime, None] = None + + def dict(self, *args, **kwargs): + data = super().dict(*args, **kwargs) + data["created_at"] = utils.datetime_to_iso8601(data["created_at"]) + data["updated_at"] = utils.datetime_to_iso8601(data["updated_at"]) + data["last_collected_at"] = utils.datetime_to_iso8601(data.get("last_collected_at")) + return data + + +class CollectorsResponse(BaseModel): + results: List[CollectorResponse] + total_count: int diff --git a/src/spaceone/inventory_v2/model/collector_rule/__init__.py b/src/spaceone/inventory_v2/model/collector_rule/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spaceone/inventory_v2/model/collector_rule/database.py b/src/spaceone/inventory_v2/model/collector_rule/database.py new file mode 100644 index 0000000..3721fc9 --- /dev/null +++ b/src/spaceone/inventory_v2/model/collector_rule/database.py @@ -0,0 +1,62 @@ +from mongoengine import * + +from spaceone.core.model.mongo_model import MongoModel + + +class CollectorRuleCondition(EmbeddedDocument): + key = StringField(required=True) + value = StringField(required=True) + operator = StringField(choices=("eq", "contain", "not", "not_contain")) + + +class CollectorRuleOptions(EmbeddedDocument): + stop_processing = BooleanField(default=False) + + +class CollectorRule(MongoModel): + collector_rule_id = StringField( + max_length=40, generate_id="collector-rule", unique=True + ) + name = StringField(max_length=255, default="") + rule_type = StringField( + max_length=255, default="CUSTOM", choices=("MANAGED", "CUSTOM") + ) + order = IntField(required=True) + conditions = ListField(EmbeddedDocumentField(CollectorRuleCondition), default=[]) + conditions_policy = StringField(max_length=20, choices=("ALL", "ANY", "ALWAYS")) + actions = DictField(default={}) + options = EmbeddedDocumentField(CollectorRuleOptions, default=CollectorRuleOptions) + tags = DictField(default={}) + collector = ReferenceField("Collector", reverse_delete_rule=CASCADE) + collector_id = StringField(max_length=40) + resource_group = StringField(max_length=40, choices=("DOMAIN", "WORKSPACE")) + workspace_id = StringField(max_length=40) + domain_id = StringField(max_length=40) + created_at = DateTimeField(auto_now_add=True) + + meta = { + "updatable_fields": [ + "name", + "order", + "conditions", + "conditions_policy", + "actions", + "options", + "tags", + ], + "minimal_fields": [ + "collector_rule_id", + "name", + "order", + "rule_type", + "collector_id", + ], + "ordering": ["order"], + "indexes": [ + "rule_type", + "conditions_policy", + "collector_id", + "workspace_id", + "domain_id", + ], + } diff --git a/src/spaceone/inventory_v2/model/job/__init__.py b/src/spaceone/inventory_v2/model/job/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spaceone/inventory_v2/model/job/database.py b/src/spaceone/inventory_v2/model/job/database.py new file mode 100644 index 0000000..4403de8 --- /dev/null +++ b/src/spaceone/inventory_v2/model/job/database.py @@ -0,0 +1,66 @@ +from mongoengine import * + +from spaceone.core.model.mongo_model import MongoModel +from spaceone.inventory_v2.model.collector.database import Collector + + +class Job(MongoModel): + job_id = StringField(max_length=40, generate_id="job", unique=True) + status = StringField( + max_length=20, + default="IN_PROGRESS", + choices=("CANCELED", "IN_PROGRESS", "FAILURE", "SUCCESS"), + ) + total_tasks = IntField(min_value=0, default=0) + remained_tasks = IntField(default=0) + success_tasks = IntField(min_value=0, default=0) + failure_tasks = IntField(min_value=0, default=0) + collector_id = StringField(max_length=40) + request_secret_id = StringField(max_length=40, null=True, default=None) + request_workspace_id = StringField(max_length=40, null=True, default=None) + plugin_id = StringField(max_length=40) + resource_group = StringField(max_length=40, choices=("DOMAIN", "WORKSPACE")) + workspace_id = StringField(max_length=40) + domain_id = StringField(max_length=40) + created_at = DateTimeField(auto_now_add=True) + updated_at = DateTimeField(auto_now=True) + finished_at = DateTimeField(default=None, null=True) + + meta = { + "updatable_fields": [ + "status", + "total_tasks", + "remained_tasks", + "success_tasks", + "failure_tasks", + "collector_id", + "updated_at", + "finished_at", + ], + "minimal_fields": [ + "job_id", + "status", + "created_at", + "finished_at", + ], + "reference_query_keys": {"collector": Collector}, + "ordering": ["-created_at"], + "indexes": [ + { + "fields": ["domain_id", "collector_id", "status"], + "name": "COMPOUND_INDEX_FOR_GC_1", + }, + { + "fields": ["domain_id", "-created_at", "status"], + "name": "COMPOUND_INDEX_FOR_GC_2", + }, + { + "fields": ["domain_id", "workspace_id"], + "name": "COMPOUND_INDEX_FOR_SEARCH_1", + }, + "status", + "collector_id", + "workspace_id", + "domain_id", + ], + } diff --git a/src/spaceone/inventory_v2/model/job/request.py b/src/spaceone/inventory_v2/model/job/request.py new file mode 100644 index 0000000..432f396 --- /dev/null +++ b/src/spaceone/inventory_v2/model/job/request.py @@ -0,0 +1,6 @@ +from typing import Union, Literal, List +from pydantic import BaseModel + +__all__ = [] + +Status = Literal['CANCELED', 'IN_PROGRESS', 'FAILURE', 'SUCCESS'] diff --git a/src/spaceone/inventory_v2/model/job/response.py b/src/spaceone/inventory_v2/model/job/response.py new file mode 100644 index 0000000..b96f026 --- /dev/null +++ b/src/spaceone/inventory_v2/model/job/response.py @@ -0,0 +1,37 @@ +from datetime import datetime +from typing import Union, Literal, List +from pydantic import BaseModel + +from spaceone.core import utils + +from spaceone.inventory_v2.model.job.request import Status + +__all__ = [ + "JobResponse" +] + + +class JobResponse(BaseModel): + job_id: Union[str, None] = None + status: Union[Status, None] = None + total_tasks: Union[int, None] = None + remained_tasks: Union[int, None] = None + success_tasks: Union[int, None] = None + failure_tasks: Union[int, None] = None + collector_id: Union[str, None] = None + request_secret_id: Union[str, None] = None + request_workspace_id: Union[str, None] = None + plugin_id: Union[str, None] = None + resource_group: Union[str, None] = None + workspace_id: Union[str, None] = None + domain_id: Union[str, None] = None + created_at: Union[datetime, None] = None + updated_at: Union[datetime, None] = None + finished_at: Union[datetime, None] = None + + def dict(self, *args, **kwargs): + data = super().dict(*args, **kwargs) + data["created_at"] = utils.datetime_to_iso8601(data["created_at"]) + data["updated_at"] = utils.datetime_to_iso8601(data["updated_at"]) + data["finished_at"] = utils.datetime_to_iso8601(data.get("finished_at")) + return data diff --git a/src/spaceone/inventory_v2/model/job_task/__init__.py b/src/spaceone/inventory_v2/model/job_task/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/spaceone/inventory_v2/model/job_task/database.py b/src/spaceone/inventory_v2/model/job_task/database.py new file mode 100644 index 0000000..43e218f --- /dev/null +++ b/src/spaceone/inventory_v2/model/job_task/database.py @@ -0,0 +1,92 @@ +from mongoengine import * +from spaceone.core.model.mongo_model import MongoModel + + +class Error(EmbeddedDocument): + error_code = StringField() + message = StringField() + additional = DictField() + + +class JobTask(MongoModel): + job_task_id = StringField(max_length=40, generate_id="job-task", unique=True) + status = StringField( + max_length=20, + default="PENDING", + choices=("PENDING", "CANCELED", "IN_PROGRESS", "SUCCESS", "FAILURE"), + ) + provider = StringField(max_length=40, default=None, null=True) + total_sub_tasks = IntField(default=0) + remained_sub_tasks = IntField(default=0) + created_count = IntField(default=0) + updated_count = IntField(default=0) + deleted_count = IntField(default=0) + disconnected_count = IntField(default=0) + failure_count = IntField(default=0) + total_count = IntField(default=0) + errors = ListField(EmbeddedDocumentField(Error, default=None, null=True)) + job_id = StringField(max_length=40) + secret_id = StringField(max_length=40) + collector_id = StringField(max_length=40) + service_account_id = StringField(max_length=40) + project_id = StringField(max_length=40) + workspace_id = StringField(max_length=40) + domain_id = StringField(max_length=40) + created_at = DateTimeField(auto_now_add=True) + started_at = DateTimeField(default=None, null=True) + finished_at = DateTimeField(default=None, null=True) + + meta = { + "updatable_fields": [ + "status", + "provider", + "remained_sub_tasks", + "created_count", + "updated_count", + "deleted_count", + "disconnected_count", + "failure_count", + "errors", + "started_at", + "finished_at", + ], + "minimal_fields": [ + "job_task_id", + "status", + "created_count", + "updated_count", + "deleted_count", + "disconnected_count", + "failure_count", + "job_id", + "created_at", + "started_at", + "finished_at", + ], + "change_query_keys": {"user_projects": "project_id"}, + "ordering": ["-created_at"], + "indexes": [ + { + "fields": ["domain_id", "collector_id", "status"], + "name": "COMPOUND_INDEX_FOR_GC_1", + }, + { + "fields": ["domain_id", "job_id"], + "name": "COMPOUND_INDEX_FOR_GC_2", + }, + { + "fields": ["domain_id", "-created_at", "status"], + "name": "COMPOUND_INDEX_FOR_GC_3", + }, + { + "fields": ["domain_id", "workspace_id", "project_id"], + "name": "COMPOUND_INDEX_FOR_SEARCH_1", + }, + "status", + "job_id", + "collector_id", + "project_id", + "workspace_id", + "domain_id", + ], + } diff --git a/src/spaceone/inventory_v2/service/collector_service.py b/src/spaceone/inventory_v2/service/collector_service.py new file mode 100644 index 0000000..afd9b9e --- /dev/null +++ b/src/spaceone/inventory_v2/service/collector_service.py @@ -0,0 +1,959 @@ +import logging +from typing import Union, Tuple + +from mongoengine import QuerySet +from spaceone.core import utils +from spaceone.core.error import * +from spaceone.core.service import * + +from spaceone.inventory_v2.manager.collection_state_manager import ( + CollectionStateManager, +) +from spaceone.inventory_v2.manager.collector_manager import CollectorManager +from spaceone.inventory_v2.manager.collector_plugin_manager import ( + CollectorPluginManager, +) +from spaceone.inventory_v2.manager.collector_rule_manager import CollectorRuleManager +from spaceone.inventory_v2.manager.identity_manager import IdentityManager +from spaceone.inventory_v2.manager.job_manager import JobManager +from spaceone.inventory_v2.manager.job_task_manager import JobTaskManager +from spaceone.inventory_v2.manager.plugin_manager import PluginManager +from spaceone.inventory_v2.manager.repository_manager import RepositoryManager +from spaceone.inventory_v2.manager.secret_manager import SecretManager +from spaceone.inventory_v2.model import Collector +from spaceone.inventory_v2.model.collector.request import * +from spaceone.inventory_v2.model.collector.response import * +from spaceone.inventory_v2.model.job.response import JobResponse + +_LOGGER = logging.getLogger(__name__) +_KEYWORD_FILTER = ["collector_id", "name", "provider"] + + +@authentication_handler +@authorization_handler +@mutation_handler +@event_handler +class CollectorService(BaseService): + resource = "Collector" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.collector_mgr = CollectorManager() + + @transaction( + permission="inventory-v2:Collector.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def create(self, params: CollectorCreateRequest) -> Union[CollectorResponse, dict]: + """Create collector + Args: + params (dict): { + 'name': 'str', # required + 'plugin_info': 'dict', # required + 'schedule': 'dict', + 'secret_filter': 'dict', + 'provider': 'str', + 'tags': 'dict', + 'resource_group': 'str', # required + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + collector_vo (object) + """ + + identity_mgr = IdentityManager() + secret_mgr = SecretManager() + + domain_id = params.domain_id + workspace_id = params.workspace_id + resource_group = params.resource_group + + # Check permission by resource group + if resource_group == "WORKSPACE": + if workspace_id is None: + raise ERROR_REQUIRED_PARAMETER(key="workspace_id") + + identity_mgr.check_workspace(workspace_id, domain_id) + else: + params.workspace_id = "*" + + if schedule := params.schedule: + self._check_schedule(schedule) + + plugin_manager = PluginManager() + collector_plugin_mgr = CollectorPluginManager() + + create_params = params.dict() + plugin_info = params.plugin_info + plugin_id = plugin_info["plugin_id"] + + plugin_info_from_repository = self._get_plugin_from_repository(plugin_id) + capability = plugin_info_from_repository.get("capability", {}) + plugin_provider = self._get_plugin_providers( + params.provider, plugin_info_from_repository + ) + + create_params["capability"] = capability + create_params["provider"] = plugin_provider + + if "secret_filter" in params: + if create_params["secret_filter"].get("state") == "ENABLED": + self._validate_secret_filter( + identity_mgr, + secret_mgr, + create_params["secret_filter"], + plugin_provider, + domain_id, + ) + else: + del create_params["secret_filter"] + + collector_vo = self.collector_mgr.create_collector(create_params) + + endpoint, updated_version = plugin_manager.get_endpoint( + plugin_info["plugin_id"], + domain_id, + plugin_info.get("upgrade_mode", "AUTO"), + plugin_info.get("version"), + ) + + plugin_response = collector_plugin_mgr.init_plugin( + endpoint, plugin_info.get("options", {}) + ) + + if updated_version: + plugin_info["version"] = updated_version + + plugin_info["metadata"] = plugin_response.get("metadata", {}) + + collector_vo = self.collector_mgr.update_collector_by_vo( + {"plugin_info": plugin_info}, collector_vo + ) + + collector_rules = plugin_info["metadata"].get("collector_rules", []) + self.create_collector_rules_by_metadata( + collector_rules, + collector_vo.collector_id, + resource_group, + domain_id, + workspace_id, + ) + + return CollectorResponse(**collector_vo.to_dict()) + + @transaction( + permission="inventory-v2:Collector.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def update(self, params: CollectorUpdateRequest) -> Union[CollectorResponse, dict]: + """Update collector + Args: + params (dict): { + 'collector_id': 'str', # required + 'name': 'str', + 'schedule': 'dict', + 'secret_filter': 'dict', + 'tags': 'dict', + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + collector_vo (object) + """ + + domain_id = params.domain_id + workspace_id = params.workspace_id + collector_id = params.collector_id + + if schedule := params.schedule: + self._check_schedule(schedule) + + collector_vo = self.collector_mgr.get_collector( + collector_id, domain_id, workspace_id + ) + + if secret_filter := params.secret_filter: + if secret_filter.get("state") == "ENABLED": + identity_mgr = IdentityManager() + secret_mgr = SecretManager() + + self._validate_secret_filter( + identity_mgr, + secret_mgr, + secret_filter, + collector_vo.provider, + domain_id, + ) + else: + params.secret_filter = { + "state": "DISABLED", + } + + collector_vo = self.collector_mgr.update_collector_by_vo( + params.dict(exclude_unset=True), collector_vo + ) + return CollectorResponse(**collector_vo.to_dict()) + + @transaction( + permission="inventory-v2:Collector.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def update_plugin( + self, params: CollectorUpdatePluginRequest + ) -> Union[CollectorResponse, dict]: + """Update plugin info of collector + Args: + params (dict): { + 'collector_id': 'str', # required + 'version': 'str', + 'options': 'dict', + 'upgrade_mode': 'str', + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + collector_vo (object) + """ + + plugin_manager = PluginManager() + + collector_id = params.collector_id + domain_id = params.domain_id + workspace_id = params.workspace_id + + collector_vo = self.collector_mgr.get_collector( + collector_id, domain_id, workspace_id + ) + plugin_info = collector_vo.plugin_info.to_dict() + + if version := params.version: + plugin_info["version"] = version + + if options := params.options: + plugin_info["options"] = options + + if upgrade_mode := params.upgrade_mode: + plugin_info["upgrade_mode"] = upgrade_mode + + endpoint, updated_version = plugin_manager.get_endpoint( + plugin_info["plugin_id"], + domain_id, + plugin_info.get("upgrade_mode", "AUTO"), + plugin_info.get("version"), + ) + + collector_vo = self._update_collector_plugin( + endpoint, updated_version, plugin_info, collector_vo + ) + return CollectorResponse(**collector_vo.to_dict()) + + @transaction( + permission="inventory-v2:Collector.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @convert_model + def verify_plugin(self, params: CollectorVerifyPluginRequest) -> None: + """Verify plugin info of collector + Args: + params (dict): { + 'collector_id': 'str', # required + 'secret_id': 'str', + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + collector_vo (object) + """ + + collector_plugin_mgr = CollectorPluginManager() + + plugin_manager = PluginManager() + secret_manager = SecretManager() + + collector_id = params.collector_id + domain_id = params.collector_id + workspace_id = params.workspace_id + + collector_vo = self.collector_mgr.get_collector( + collector_id, domain_id, workspace_id + ) + + if collector_vo.resource_group == "WORKSPACE": + collector_workspace_id = collector_vo.workspace_id + else: + collector_workspace_id = None + + plugin_info = collector_vo.plugin_info.to_dict() + + endpoint, updated_version = plugin_manager.get_endpoint( + plugin_info["plugin_id"], + plugin_info.get("version"), + plugin_info.get("upgrade_mode", "AUTO"), + domain_id, + ) + + secret_ids = self._get_secret_ids_from_filter( + collector_vo.secret_filter.to_dict(), + collector_vo.provider, + domain_id, + params.secret_id, + collector_workspace_id, + ) + + if secret_ids: + secret_data_info = secret_manager.get_secret_data(secret_ids[0], domain_id) + secret_data = secret_data_info.get("data", {}) + collector_plugin_mgr.verify_plugin( + endpoint, plugin_info.get("options", {}), secret_data + ) + + @transaction( + permission="inventory-v2:Collector.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER"], + ) + @check_required(["collector_id", "domain_id"]) + @convert_model + def delete(self, params: CollectorDeleteRequest) -> None: + """Delete collector + Args: + params (dict): { + 'collector_id': 'str', # required + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + None: + """ + state_mgr = CollectionStateManager() + job_mgr = JobManager() + job_task_mgr = JobTaskManager() + + collector_id = params.collector_id + domain_id = params.domain_id + workspace_id = params.workspace_id + + collector_vo: Collector = self.collector_mgr.get_collector( + collector_id, domain_id, workspace_id + ) + + state_mgr.delete_collection_state_by_collector_id(collector_id, domain_id) + + job_vos = job_mgr.filter_jobs(collector_id=collector_id, domain_id=domain_id) + job_vos.delete() + + job_task_vos = job_task_mgr.filter_job_tasks( + collector_id=collector_id, domain_id=domain_id + ) + job_task_vos.delete() + + self.collector_mgr.delete_collector_by_vo(collector_vo) + + @transaction( + permission="inventory-v2:Collector.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @change_value_by_rule("APPEND", "workspace_id", "*") + @convert_model + def get(self, params: CollectorGetRequest) -> Union[CollectorResponse, dict]: + """Get collector + Args: + params (dict): { + 'collector_id': 'str', # required + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str' # injected from auth (required) + } + + Returns: + collector_vo (object) + """ + + collector_mgr: CollectorManager = self.locator.get_manager(CollectorManager) + collector_id = params.collector_id + domain_id = params.domain_id + workspace_id = params.workspace_id + + collector_vo = collector_mgr.get_collector( + collector_id, domain_id, workspace_id + ) + + return CollectorResponse(**collector_vo.to_dict()) + + @transaction( + permission="inventory-v2:Collector.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @change_value_by_rule("APPEND", "workspace_id", "*") + @check_required(["domain_id"]) + @append_query_filter( + [ + "collector_id", + "name", + "state", + "secret_filter_state", + "schedule_state", + "plugin_id", + "workspace_id", + "domain_id", + ] + ) + @append_keyword_filter(_KEYWORD_FILTER) + @convert_model + def list( + self, params: CollectorSearchQueryRequest + ) -> Union[CollectorsResponse, dict]: + """List collectors + Args: + params (dict): { + 'query': 'dict (spaceone.api.core.v1.Query)', + 'collector_id': 'str', + 'name': 'str', + 'secret_filter_state': 'str', + 'schedule_state': 'str', + 'plugin_id': 'str', + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str', # injected from auth (required) + } + + Returns: + results (list) + total_count (int) + """ + query = params.query or {} + collector_vos, total_count = self.collector_mgr.list_collectors(query=query) + + collectors_info = [collector_vo.to_dict() for collector_vo in collector_vos] + return CollectorsResponse(results=collectors_info, total_count=total_count) + + @transaction( + permission="inventory-v2:Collector.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @change_value_by_rule("APPEND", "workspace_id", "*") + @append_query_filter(["workspace_id", "domain_id"]) + @append_keyword_filter(_KEYWORD_FILTER) + @convert_model + def stat(self, params: CollectorStatQueryRequest) -> dict: + """Stat collectors + Args: + params (dict): { + 'query': 'dict (spaceone.api.core.v1.StatisticsQuery)', # required + 'workspace_id': 'str', # injected from auth + 'domain_id': 'str', # injected from auth (required) + } + + Returns: + values (list) : 'list of statistics data' + + """ + query = params.query or {} + return self.collector_mgr.stat_collectors(query) + + @transaction( + permission="inventory-v2:Collector.write", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @change_value_by_rule("APPEND", "workspace_id", "*") + @convert_model + def collect(self, params: CollectorCollectRequest) -> Union[JobResponse, dict]: + """Collect data + Args: + params (dict): { + 'collector_id': 'str', # required + 'secret_id': 'str', + 'workspace_id': 'str | list', # injected from auth + 'domain_id': 'str', # injected from auth (required) + 'user_projects': 'list', # injected from auth + } + + Returns: + job_vo (object) + """ + + plugin_mgr: PluginManager = self.locator.get_manager(PluginManager) + job_mgr: JobManager = self.locator.get_manager(JobManager) + job_task_mgr: JobTaskManager = self.locator.get_manager(JobTaskManager) + + collector_id = params.collector_id + domain_id = params.domain_id + workspace_id = params.workspace_id + + collector_vo = self.collector_mgr.get_collector( + collector_id, domain_id, workspace_id + ) + collector_data = collector_vo.to_dict() + + if collector_data["resource_group"] == "WORKSPACE": + collector_workspace_id = collector_data["workspace_id"] + else: + collector_workspace_id = None + + plugin_info = collector_data["plugin_info"] + secret_filter = collector_data.get("secret_filter", {}) or {} + plugin_id = plugin_info["plugin_id"] + version = plugin_info.get("version") + upgrade_mode = plugin_info.get("upgrade_mode", "AUTO") + + endpoint, updated_version = plugin_mgr.get_endpoint( + plugin_id, domain_id, upgrade_mode, version + ) + + if updated_version and version != updated_version: + _LOGGER.debug( + f"[collect] upgrade plugin version: {version} -> {updated_version}" + ) + collector_vo = self._update_collector_plugin( + endpoint, updated_version, plugin_info, collector_vo + ) + + tasks = self._get_tasks( + params.dict(), + endpoint, + collector_id, + collector_vo.provider, + plugin_info, + secret_filter, + domain_id, + collector_workspace_id, + ) + + duplicated_job_vos = job_mgr.get_duplicate_jobs( + collector_id, domain_id, workspace_id, params.secret_id + ) + + for job_vo in duplicated_job_vos: + job_mgr.make_canceled_by_vo(job_vo) + + # create job + create_job_params = params.dict() + create_job_params["plugin_id"] = plugin_id + create_job_params["total_tasks"] = len(tasks) + create_job_params["remained_tasks"] = len(tasks) + job_vo = job_mgr.create_job(collector_vo, create_job_params) + + _LOGGER.debug(f"[collect] total tasks ({job_vo.job_id}): {len(tasks)}") + if len(tasks) > 0: + for task in tasks: + secret_info = task["secret_info"] + sub_tasks = task.get("sub_tasks", []) + if len(sub_tasks) == 0: + sub_task_count = 1 + else: + sub_task_count = len(sub_tasks) + + if "sub_tasks" in task: + del task["sub_tasks"] + + create_params = { + "total_sub_tasks": sub_task_count, + "remained_sub_tasks": sub_task_count, + "job_id": job_vo.job_id, + "collector_id": job_vo.collector_id, + "secret_id": secret_info.get("secret_id"), + "service_account_id": secret_info.get("service_account_id"), + "project_id": secret_info.get("project_id"), + "workspace_id": secret_info.get("workspace_id"), + "domain_id": domain_id, + } + + task.update({"collector_id": collector_id, "job_id": job_vo.job_id}) + + try: + # create job task + job_task_vo = job_task_mgr.create_job_task(create_params) + task.update({"job_task_id": job_task_vo.job_task_id}) + + if len(sub_tasks) > 0: + for sub_task in sub_tasks: + task_options = sub_task.get("task_options", {}) + task.update( + {"task_options": task_options, "is_sub_task": True} + ) + _LOGGER.debug( + f"[collect] push sub task ({job_task_vo.job_task_id}) => {utils.dump_json(task_options)}" + ) + job_task_mgr.push_job_task(task) + else: + _LOGGER.debug( + f"[collect] push job task ({job_task_vo.job_task_id})" + ) + job_task_mgr.push_job_task(task) + + except Exception as e: + _LOGGER.error( + f"[collect] Error to create job task ({job_vo.job_id}): {e}", + exc_info=True, + ) + job_mgr.make_failure_by_vo(job_vo) + + self.collector_mgr.update_last_collected_time(collector_vo) + else: + # close job if no tasks + job_mgr.make_success_by_vo(job_vo) + + return JobResponse(**job_vo.to_dict()) + + def _get_tasks( + self, + params: dict, + endpoint: str, + collector_id: str, + collector_provider: str, + plugin_info: dict, + secret_filter: dict, + domain_id: str, + collector_workspace_id: str = None, + ) -> list: + secret_mgr: SecretManager = self.locator.get_manager(SecretManager) + collector_plugin_mgr: CollectorPluginManager = self.locator.get_manager( + CollectorPluginManager + ) + + tasks = [] + secret_ids = self._get_secret_ids_from_filter( + secret_filter, + collector_provider, + domain_id, + params.get("secret_id"), + collector_workspace_id, + ) + + for secret_id in secret_ids: + secret_info = secret_mgr.get_secret(secret_id, domain_id) + secret_data = secret_mgr.get_secret_data(secret_id, domain_id) + _task = { + "plugin_info": plugin_info, + "secret_info": secret_info, + "secret_data": secret_data, + "domain_id": domain_id, + } + + try: + response = collector_plugin_mgr.get_tasks( + endpoint, + secret_data.get("data", {}), + plugin_info.get("options", {}), + ) + _LOGGER.debug(f"[get_tasks] sub tasks({collector_id}): {response}") + _task["sub_tasks"] = response.get("tasks", []) + + except Exception as e: + pass + + tasks.append(_task) + + return tasks + + @staticmethod + def _check_secrets( + secret_mgr: SecretManager, secret_ids: list, provider: str, domain_id: str + ) -> None: + query = { + "filter": [ + {"k": "secret_id", "v": secret_ids, "o": "in"}, + {"k": "provider", "v": provider, "o": "eq"}, + ], + "count_only": True, + } + response = secret_mgr.list_secrets(query, domain_id) + total_count = response.get("total_count", 0) + + if total_count != len(secret_ids): + raise ERROR_INVALID_PARAMETER( + key="secret_filter.secrets", + reason=f"secrets are not found: {', '.join(secret_ids)}", + ) + + @staticmethod + def _check_service_accounts( + identity_mgr: IdentityManager, + service_account_ids: list, + provider: str, + domain_id: str, + ) -> None: + query = { + "filter": [ + { + "k": "service_account_id", + "v": service_account_ids, + "o": "in", + }, + {"k": "provider", "v": provider, "o": "eq"}, + ], + "count_only": True, + } + + response = identity_mgr.list_service_accounts(query, domain_id) + total_count = response.get("total_count", 0) + + if total_count != len(service_account_ids): + raise ERROR_INVALID_PARAMETER( + key="secret_filter.service_accounts", + reason=f"service accounts are not found: {', '.join(service_account_ids)}", + ) + + @staticmethod + def _check_schemas( + identity_mgr: IdentityManager, + schema_ids: list, + provider: str, + domain_id: str, + ) -> None: + query = { + "filter": [ + { + "k": "schema_id", + "v": schema_ids, + "o": "in", + }, + {"k": "provider", "v": provider, "o": "eq"}, + ], + "count_only": True, + } + + response = identity_mgr.list_schemas(query, domain_id) + total_count = response.get("total_count", 0) + + if total_count != len(schema_ids): + raise ERROR_INVALID_PARAMETER( + key="secret_filter.schemas", + reason=f"schemas are not found: {', '.join(schema_ids)}", + ) + + def _validate_secret_filter( + self, + identity_mgr: IdentityManager, + secret_mgr: SecretManager, + secret_filter: dict, + provider: str, + domain_id: str, + ) -> None: + if "secrets" in secret_filter: + self._check_secrets( + secret_mgr, secret_filter["secrets"], provider, domain_id + ) + + if "service_accounts" in secret_filter: + self._check_service_accounts( + identity_mgr, secret_filter["service_accounts"], provider, domain_id + ) + + if "schemas" in secret_filter: + self._check_schemas( + identity_mgr, secret_filter["schemas"], provider, domain_id + ) + + if "exclude_secrets" in secret_filter: + self._check_secrets( + secret_mgr, secret_filter["exclude_secrets"], provider, domain_id + ) + + if "exclude_service_accounts" in secret_filter: + self._check_service_accounts( + identity_mgr, + secret_filter["exclude_service_accounts"], + provider, + domain_id, + ) + + if "exclude_schemas" in secret_filter: + self._check_schemas( + identity_mgr, secret_filter["exclude_schemas"], provider, domain_id + ) + + def _update_collector_plugin( + self, + endpoint: str, + updated_version: str, + plugin_info: dict, + collector_vo: Collector, + ) -> Collector: + collector_plugin_mgr = CollectorPluginManager() + plugin_response = collector_plugin_mgr.init_plugin( + endpoint, plugin_info.get("options", {}) + ) + + if updated_version: + plugin_info["version"] = updated_version + + plugin_info["metadata"] = plugin_response.get("metadata", {}) + + collector_vo = self.collector_mgr.update_collector_by_vo( + {"plugin_info": plugin_info}, collector_vo + ) + + self.delete_collector_rules(collector_vo.collector_id, collector_vo.domain_id), + + collector_rules = plugin_info["metadata"].get("collector_rules", []) + self.create_collector_rules_by_metadata( + collector_rules, + collector_vo.collector_id, + collector_vo.resource_group, + collector_vo.domain_id, + collector_vo.workspace_id, + ) + + return collector_vo + + def _get_secret_ids_from_filter( + self, + secret_filter: dict, + provider: str, + domain_id: str, + secret_id: str = None, + workspace_id: str = None, + ) -> list: + secret_manager: SecretManager = self.locator.get_manager(SecretManager) + + query = { + "filter": self._make_secret_filter( + secret_filter, provider, secret_id, workspace_id + ) + } + + response = secret_manager.list_secrets(query, domain_id) + + return [ + secret_info.get("secret_id") for secret_info in response.get("results", []) + ] + + @check_required(["hour"]) + def scheduled_collectors(self, params: dict) -> Tuple[QuerySet, int]: + """Search all collectors in this schedule. + This is global search out-of domain. + + Args: + params(dict): { + 'hour': 'int', # required + } + + Returns: + results (list) + total_count (int) + """ + + collector_mgr: CollectorManager = self.locator.get_manager(CollectorManager) + query = { + "filter": [ + {"k": "schedule.state", "v": "ENABLED", "o": "eq"}, + {"k": "schedule.hours", "v": params["hour"], "o": "contain"}, + ] + } + return collector_mgr.list_collectors(query) + + @staticmethod + def _get_plugin_from_repository(plugin_id: str) -> dict: + repo_mgr = RepositoryManager() + return repo_mgr.get_plugin(plugin_id) + + @staticmethod + def create_collector_rules_by_metadata( + collector_rules: list, + collector_id: str, + resource_group: str, + domain_id: str, + workspace_id: str = None, + ): + collector_rule_mgr = CollectorRuleManager() + + for collector_rule_params in collector_rules: + collector_rule_params.update( + { + "collector_id": collector_id, + "rule_type": "MANAGED", + "resource_group": resource_group, + "workspace_id": workspace_id, + "domain_id": domain_id, + } + ) + + collector_rule_mgr.create_collector_rule(collector_rule_params) + + @staticmethod + def delete_collector_rules(collector_id: str, domain_id: str) -> None: + collector_rule_mgr = CollectorRuleManager() + old_collector_rule_vos = collector_rule_mgr.filter_collector_rules( + collector_id=collector_id, rule_type="MANAGED", domain_id=domain_id + ) + old_collector_rule_vos.delete() + + @staticmethod + def _make_secret_filter( + secret_filter: dict, + provider: str, + secret_id: str = None, + workspace_id: str = None, + ) -> list: + _filter = [{"k": "provider", "v": provider, "o": "eq"}] + + if secret_id: + _filter.append({"k": "secret_id", "v": secret_id, "o": "eq"}) + + if workspace_id: + _filter.append({"k": "workspace_id", "v": workspace_id, "o": "eq"}) + + if secret_filter.get("state") == "ENABLED": + if secrets := secret_filter.get("secrets"): + _filter.append({"k": "secret_id", "v": secrets, "o": "in"}) + + if service_accounts := secret_filter.get("service_accounts"): + _filter.append( + {"k": "service_account_id", "v": service_accounts, "o": "in"} + ) + + if schemas := secret_filter.get("schemas"): + _filter.append({"k": "schema", "v": schemas, "o": "in"}) + + if exclude_secrets := secret_filter.get("exclude_secrets"): + _filter.append({"k": "secret_id", "v": exclude_secrets, "o": "not_in"}) + + if exclude_service_accounts := secret_filter.get( + "exclude_service_accounts" + ): + _filter.append( + { + "k": "service_account_id", + "v": exclude_service_accounts, + "o": "not_in", + } + ) + + if exclude_schemas := secret_filter.get("exclude_schemas"): + _filter.append({"k": "schema", "v": exclude_schemas, "o": "not_in"}) + + return _filter + + @staticmethod + def _get_plugin_providers(provider: str, plugin_info: dict) -> str: + supported_providers = plugin_info.get("capability", {}).get( + "supported_providers", [] + ) + + if supported_providers: + # Multi providers + if provider in supported_providers: + return provider + else: + raise ERROR_INVALID_PARAMETER( + key="provider", reason=f"Not supported provider: {provider}" + ) + else: + # Single provider + return provider if provider else plugin_info.get("provider") + + @staticmethod + def _check_schedule(schedule: dict) -> None: + if schedule.get("state") == "ENABLED": + if hours := schedule.get("hours"): + if len(hours) > 2: + raise ERROR_INVALID_PARAMETER( + key="schedule.hours", reason="Maximum 2 hours can be set." + )