diff --git a/deploy/helm/Chart.yaml b/deploy/helm/Chart.yaml index c4ff886..3c3501c 100644 --- a/deploy/helm/Chart.yaml +++ b/deploy/helm/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.0.1 +version: 0.0.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. diff --git a/deploy/helm/values.yaml b/deploy/helm/values.yaml index d7ddb19..7f4c8b8 100644 --- a/deploy/helm/values.yaml +++ b/deploy/helm/values.yaml @@ -5,7 +5,7 @@ enabled: true developer: false grpc: true scheduler: false -worker: false +worker: true rest: false name: inventory-v2 image: @@ -100,8 +100,17 @@ application_scheduler: # Overwrite worker config application_worker: - QUEUES: {} - WORKERS: {} + QUEUES: + inventory_q: + backend: spaceone.core.queue.redis_queue.RedisQueue + host: redis + port: 6379 + channel: inventory_job + WORKERS: + inventory_worker: + backend: spaceone.core.scheduler.worker.BaseWorker + queue: inventory_q + pool: 1 ########################## # local sidecar diff --git a/src/spaceone/inventory_v2/manager/job_task_manager.py b/src/spaceone/inventory_v2/manager/job_task_manager.py index 9c16478..0385194 100644 --- a/src/spaceone/inventory_v2/manager/job_task_manager.py +++ b/src/spaceone/inventory_v2/manager/job_task_manager.py @@ -10,6 +10,7 @@ from spaceone.core.model.mongo_model import QuerySet from spaceone.inventory_v2.manager.job_manager import JobManager + # from spaceone.inventory.manager.cleanup_manager import CleanupManager from spaceone.inventory_v2.model.job_task.database import JobTask @@ -31,11 +32,11 @@ def _rollback(vo: JobTask): return job_task_vo def get( - self, - job_task_id: str, - domain_id: str, - workspace_id: str = None, - user_projects: list = None, + self, + job_task_id: str, + domain_id: str, + workspace_id: str = None, + user_projects: list = None, ) -> JobTask: conditions = { "job_task_id": job_task_id, @@ -60,17 +61,32 @@ def stat(self, query: dict) -> dict: return self.job_task_model.stat(**query) def push_job_task(self, params: dict) -> None: - task = self.create_task_pipeline(copy.deepcopy(params)) + token = self.transaction.meta.get("token") + params["token"] = token + task = { + "name": "collecting_resources", + "version": "v1", + "executionEngine": "BaseWorker", + "stages": [ + { + "locator": "MANAGER", + "name": "CollectingManager", + "metadata": {}, + "method": "collecting_resources", + "params": {"params": params}, + } + ], + } + validate(task, schema=SPACEONE_TASK_SCHEMA) - json_task = json.dumps(task) - queue.put(self.get_queue_name(name="collect_queue"), json_task) + queue.put("inventory_q", utils.dump_json(task)) @staticmethod def add_error( - job_task_vo: JobTask, - error_code: str, - error_message: str, - additional: dict = None, + job_task_vo: JobTask, + error_code: str, + error_message: str, + additional: dict = None, ) -> None: error_info = {"error_code": error_code, "message": str(error_message).strip()} @@ -84,10 +100,10 @@ def add_error( @staticmethod def _update_job_status_by_vo( - job_task_vo: JobTask, - status: str, - started_at: datetime = None, - finished_at: datetime = None, + job_task_vo: JobTask, + status: str, + started_at: datetime = None, + finished_at: datetime = None, ) -> None: params = {"status": status} @@ -104,8 +120,8 @@ def _update_job_status_by_vo( job_task_vo.update(params) def make_inprogress_by_vo( - self, - job_task_vo: JobTask, + self, + job_task_vo: JobTask, ) -> None: if job_task_vo.status == "PENDING": self._update_job_status_by_vo( @@ -115,8 +131,8 @@ def make_inprogress_by_vo( ) def make_success_by_vo( - self, - job_task_vo: JobTask, + self, + job_task_vo: JobTask, ) -> None: self._update_job_status_by_vo( job_task_vo, @@ -125,9 +141,9 @@ def make_success_by_vo( ) def make_failure_by_vo( - self, - job_task_vo: JobTask, - collecting_count_info: dict = None, + self, + job_task_vo: JobTask, + collecting_count_info: dict = None, ) -> None: self._update_job_status_by_vo( job_task_vo, @@ -138,7 +154,7 @@ def make_failure_by_vo( self.decrease_remained_sub_tasks(job_task_vo, collecting_count_info) def decrease_remained_sub_tasks( - self, job_task_vo: JobTask, collecting_count_info: dict = None + self, job_task_vo: JobTask, collecting_count_info: dict = None ) -> JobTask: if collecting_count_info: self._update_collecting_count_info(job_task_vo, collecting_count_info) @@ -165,7 +181,7 @@ def decrease_remained_sub_tasks( @staticmethod def _update_collecting_count_info( - job_task_vo: JobTask, collecting_count_info: dict + job_task_vo: JobTask, collecting_count_info: dict ) -> None: _LOGGER.debug( f"[_update_collecting_count_info] update collecting count => {utils.dump_json(collecting_count_info)}"