Skip to content

Commit

Permalink
Merge pull request #4 from ImMin5/master
Browse files Browse the repository at this point in the history
Add Collector service
  • Loading branch information
ImMin5 authored Nov 28, 2024
2 parents 824fc53 + d007fa6 commit 4174905
Show file tree
Hide file tree
Showing 28 changed files with 2,878 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/spaceone/inventory_v2/interface/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from spaceone.core.pygrpc.server import GRPCServer
from .region import Region
from .collector import Collector

_all_ = ["app"]

app = GRPCServer()
app.add_service(Region)
app.add_service(Region)
app.add_service(Collector)
66 changes: 66 additions & 0 deletions src/spaceone/inventory_v2/interface/grpc/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from google.protobuf.json_format import ParseDict
from spaceone.api.inventory_v2.v1 import collector_pb2, collector_pb2_grpc
from spaceone.api.inventory_v2.v1 import job_pb2
from spaceone.core.pygrpc import BaseAPI

from spaceone.inventory_v2.service.collector_service import CollectorService


class Collector(BaseAPI, collector_pb2_grpc.CollectorServicer):
pb2 = collector_pb2
pb2_grpc = collector_pb2_grpc

def create(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.create(params)
return self.dict_to_message(response)

def update(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.update(params)
return self.dict_to_message(response)

def delete(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
collector_svc.delete(params)
return self.empty()

def get(self, request, context):
params, metadata = self.parse_request(request, context)

collector_svc = CollectorService(metadata)
response: dict = collector_svc.get(params)
return self.dict_to_message(response)

def list(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.list(params)
return self.dict_to_message(response)

def stat(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.stat(params)
return self.dict_to_message(response)

def collect(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.collect(params)
return ParseDict(response, job_pb2.JobInfo())

def update_plugin(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.update_plugin(params)
return self.dict_to_message(response)

def verify_plugin(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
collector_svc.verify_plugin(params)
return self.empty()
100 changes: 100 additions & 0 deletions src/spaceone/inventory_v2/manager/collection_state_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
from typing import Union, Tuple, List

from spaceone.core.model.mongo_model import QuerySet
from spaceone.core.manager import BaseManager
from spaceone.inventory_v2.model.collection_state.database import CollectionState

_LOGGER = logging.getLogger(__name__)


class CollectionStateManager(BaseManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.collector_id = self.transaction.get_meta("collector_id")
self.job_task_id = self.transaction.get_meta("job_task_id")
self.secret_id = self.transaction.get_meta("secret.secret_id")
self.collection_state_model = CollectionState

def create_collection_state(self, cloud_service_id: str, domain_id: str) -> None:
def _rollback(vo: CollectionState):
_LOGGER.info(
f"[ROLLBACK] Delete collection state: cloud_service_id = {vo.cloud_service_id}, "
f"collector_id = {vo.collector_id}"
)
vo.terminate()

if self.collector_id and self.job_task_id and self.secret_id:
state_data = {
"collector_id": self.collector_id,
"job_task_id": self.job_task_id,
"secret_id": self.secret_id,
"cloud_service_id": cloud_service_id,
"domain_id": domain_id,
}

state_vo = self.collection_state_model.create(state_data)
self.transaction.add_rollback(_rollback, state_vo)

def update_collection_state_by_vo(
self, params: dict, state_vo: CollectionState
) -> CollectionState:
def _rollback(old_data):
_LOGGER.info(
f"[ROLLBACK] Revert collection state : cloud_service_id = {state_vo.cloud_service_id}, "
f"collector_id = {state_vo.collector_id}"
)
state_vo.update(old_data)

self.transaction.add_rollback(_rollback, state_vo.to_dict())
return state_vo.update(params)

def reset_collection_state(self, state_vo: CollectionState) -> None:
if self.job_task_id:
params = {"disconnected_count": 0, "job_task_id": self.job_task_id}

self.update_collection_state_by_vo(params, state_vo)

def get_collection_state(
self, cloud_service_id: str, domain_id: str
) -> Union[CollectionState, None]:
if self.collector_id and self.secret_id:
state_vos = self.collection_state_model.filter(
collector_id=self.collector_id,
secret_id=self.secret_id,
cloud_service_id=cloud_service_id,
domain_id=domain_id,
)

if state_vos.count() > 0:
return state_vos[0]

return None

def filter_collection_states(self, **conditions) -> QuerySet:
return self.collection_state_model.filter(**conditions)

def list_collection_states(self, query: dict) -> Tuple[QuerySet, int]:
return self.collection_state_model.query(**query)

def delete_collection_state_by_cloud_service_id(
self, resource_id: str, domain_id: str
) -> None:
state_vos = self.filter_collection_states(
cloud_service_id=resource_id, domain_id=domain_id
)
state_vos.delete()

def delete_collection_state_by_cloud_service_ids(
self, cloud_service_ids: List[str]
) -> None:
state_vos = self.filter_collection_states(cloud_service_id=cloud_service_ids)
state_vos.delete()

def delete_collection_state_by_collector_id(
self, collector_id: str, domain_id: str
) -> None:
state_vos = self.filter_collection_states(
collector_id=collector_id, domain_id=domain_id
)
state_vos.delete()
90 changes: 90 additions & 0 deletions src/spaceone/inventory_v2/manager/collector_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
from typing import Tuple, Union
from datetime import datetime
from spaceone.core.manager import BaseManager
from spaceone.core.model.mongo_model import QuerySet
from spaceone.inventory_v2.model.collector.database import Collector

__ALL__ = ["CollectorManager"]

_LOGGER = logging.getLogger(__name__)


class CollectorManager(BaseManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.collector_model = Collector()

def create_collector(self, params: dict) -> Collector:
def _rollback(vo: Collector):
_LOGGER.info(f"[ROLLBACK] Delete collector : {vo.name} ({vo.collector_id})")
vo.delete()

collector_vo: Collector = self.collector_model.create(params)
self.transaction.add_rollback(_rollback, collector_vo)
return collector_vo

def update_collector_by_vo(
self, params: dict, collector_vo: Collector
) -> Collector:
def _rollback(old_data):
_LOGGER.info(f"[ROLLBACK] Revert Data : {old_data.get('collector_id')}")
collector_vo.update(old_data)

self.transaction.add_rollback(_rollback, collector_vo.to_dict())
return collector_vo.update(params)

def enable_collector(
self, collector_id: str, domain_id: str, workspace_id: str = None
):
collector_vo: Collector = self.collector_model.get(
collector_id=collector_id, domain_id=domain_id, workspace_id=workspace_id
)

return self.update_collector_by_vo({"state": "ENABLED"}, collector_vo)

def disable_collector(
self, collector_id: str, domain_id: str, workspace_id: str = None
):
collector_vo: Collector = self.collector_model.get(
collector_id=collector_id, domain_id=domain_id, workspace_id=workspace_id
)

return self.update_collector_by_vo({"state": "DISABLED"}, collector_vo)

@staticmethod
def delete_collector_by_vo(collector_vo: Collector) -> None:
collector_vo.delete()

def get_collector(
self,
collector_id: str,
domain_id: str,
workspace_id: Union[list, str, None] = None,
) -> Collector:
conditions = {
"collector_id": collector_id,
"domain_id": domain_id,
}

if workspace_id:
conditions.update({"workspace_id": workspace_id})

return self.collector_model.get(**conditions)

def filter_collector(self, **conditions) -> QuerySet:
return self.collector_model.filter(**conditions)

def list_collectors(self, query: dict) -> Tuple[QuerySet, int]:
return self.collector_model.query(**query)

def stat_collectors(self, query: dict) -> dict:
return self.collector_model.stat(**query)

def update_last_collected_time(self, collector_vo: Collector):
_LOGGER.debug(
f"[update_last_collected_time] updated collected at: {collector_vo.collector_id}"
)
self.update_collector_by_vo(
{"last_collected_at": datetime.utcnow()}, collector_vo
)
49 changes: 49 additions & 0 deletions src/spaceone/inventory_v2/manager/collector_plugin_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
from typing import Generator, Union
from spaceone.core.manager import BaseManager
from spaceone.core.connector.space_connector import SpaceConnector

__ALL__ = ["CollectorPluginManager"]

_LOGGER = logging.getLogger(__name__)


class CollectorPluginManager(BaseManager):
def init_plugin(self, endpoint: str, options: dict) -> dict:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)
return plugin_connector.dispatch("Collector.init", {"options": options})

def verify_plugin(self, endpoint: str, options: dict, secret_data: dict) -> None:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)
params = {"options": options, "secret_data": secret_data}
plugin_connector.dispatch("Collector.verify", params)

def collect(
self,
endpoint: str,
options: dict,
secret_data: dict,
task_options: dict = None,
) -> Generator[dict, None, None]:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)

params = {"options": options, "secret_data": secret_data, "filter": {}}

if task_options:
params["task_options"] = task_options

return plugin_connector.dispatch("Collector.collect", params)

def get_tasks(self, endpoint: str, secret_data: dict, options: dict) -> dict:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)

params = {"options": options, "secret_data": secret_data}
return plugin_connector.dispatch("Job.get_tasks", params)
Loading

0 comments on commit 4174905

Please sign in to comment.