Skip to content

Commit

Permalink
feat: modify job and job_task apis
Browse files Browse the repository at this point in the history
  • Loading branch information
ImMin5 committed Dec 21, 2023
1 parent c657016 commit 6dd5531
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 77 deletions.
68 changes: 38 additions & 30 deletions src/spaceone/cost_analysis/connector/datasource_plugin_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,71 +7,79 @@
from spaceone.core.utils import parse_endpoint
from spaceone.core.error import *

__all__ = ['DataSourcePluginConnector']
__all__ = ["DataSourcePluginConnector"]

_LOGGER = logging.getLogger(__name__)


class DataSourcePluginConnector(BaseConnector):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = None
self.secret_data = None
self.options = None
self.schema = None
self.schema_id = None

def initialize(self, endpoint):
static_endpoint = self.config.get('endpoint')
static_endpoint = self.config.get("endpoint")

if static_endpoint:
endpoint = static_endpoint

e = parse_endpoint(endpoint)
self.client = pygrpc.client(endpoint=f'{e.get("hostname")}:{e.get("port")}', version='plugin')
self.client = pygrpc.client(
endpoint=f'{e.get("hostname")}:{e.get("port")}', version="plugin"
)

self.secret_data = self.config.get('secret_data')
self.options = self.config.get('options')
self.schema = self.config.get('schema')
self.secret_data = self.config.get("secret_data")
self.options = self.config.get("options")
self.schema_id = self.config.get("schema_id")

def init(self, options, domain_id):
response = self.client.DataSource.init({
'options': options,
'domain_id': domain_id
})
response = self.client.DataSource.init(
{"options": options, "domain_id": domain_id}
)

return self._change_message(response)

def verify(self, options, secret_data, schema, domain_id):
def verify(self, options, secret_data, schema_id, domain_id):
params = {
'options': self.options or options,
'secret_data': self.secret_data or secret_data,
'schema': self.schema or schema,
'domain_id': domain_id
"options": self.options or options,
"secret_data": self.secret_data or secret_data,
"schema": self.schema_id or schema_id,
"domain_id": domain_id,
}

self.client.DataSource.verify(params)

def get_tasks(self, options, secret_data, schema, domain_id, start=None, last_synchronized_at=None):
def get_tasks(
self,
options: dict,
secret_data: dict,
schema_id: str,
domain_id: str,
start: str = None,
last_synchronized_at: str = None,
):
params = {
'options': self.options or options,
'secret_data': self.secret_data or secret_data,
'schema': self.schema or schema,
'start': start,
'last_synchronized_at': last_synchronized_at,
'domain_id': domain_id
"options": self.options or options,
"secret_data": self.secret_data or secret_data,
"schema": self.schema_id or schema_id,
"start": start,
"last_synchronized_at": last_synchronized_at,
"domain_id": domain_id,
}

response = self.client.Job.get_tasks(params)
return self._change_message(response)

def get_cost_data(self, options, secret_data, schema, task_options, domain_id):
def get_cost_data(self, options, secret_data, schema_id, task_options, domain_id):
params = {
'options': self.options or options,
'secret_data': self.secret_data or secret_data,
'schema': self.schema or schema,
'task_options': task_options,
'domain_id': domain_id
"options": self.options or options,
"secret_data": self.secret_data or secret_data,
"schema": self.schema_id or schema_id,
"task_options": task_options,
"domain_id": domain_id,
}

response_stream = self.client.Cost.get_data(params)
Expand Down
26 changes: 13 additions & 13 deletions src/spaceone/cost_analysis/manager/data_source_plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,32 @@ def init_plugin(self, options, domain_id):

return plugin_metadata

def verify_plugin(self, options, secret_data, schema, domain_id):
self.dsp_connector.verify(options, secret_data, schema, domain_id)
def verify_plugin(self, options, secret_data, schema_id, domain_id):
self.dsp_connector.verify(options, secret_data, schema_id, domain_id)

def get_tasks(
self,
options,
secret_id,
secret_data,
schema,
start,
last_synchronized_at,
domain_id,
options: dict,
secret_id: str,
secret_data: str,
schema_id: str,
start: str,
last_synchronized_at: str,
domain_id: str,
):
response = self.dsp_connector.get_tasks(
options, secret_data, schema, domain_id, start, last_synchronized_at
options, secret_data, schema_id, domain_id, start, last_synchronized_at
)
tasks = response.get("tasks", [])

for task in tasks:
task.update({"secret_id": secret_id, "schema": schema})
task.update({"secret_id": secret_id, "schema": schema_id})

return tasks, response.get("changed", [])

def get_cost_data(self, options, secret_data, schema, task_options, domain_id):
def get_cost_data(self, options, secret_data, schema_id, task_options, domain_id):
return self.dsp_connector.get_cost_data(
options, secret_data, schema, task_options, domain_id
options, secret_data, schema_id, task_options, domain_id
)

def get_data_source_plugin_endpoint_by_vo(self, data_source_vo: DataSource):
Expand Down
2 changes: 1 addition & 1 deletion src/spaceone/cost_analysis/manager/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def create_job(
job_options["start"] = job_options.get("start")

data = {
"data_source_id": data_source_id,
"resource_group": resource_group,
"data_source_id": data_source_id,
"workspace_id": workspace_id,
"domain_id": domain_id,
"options": job_options,
Expand Down
12 changes: 11 additions & 1 deletion src/spaceone/cost_analysis/manager/job_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@ def __init__(self, *args, **kwargs):
self.job_mgr: JobManager = self.locator.get_manager("JobManager")
self.job_task_model: JobTask = self.locator.get_model("JobTask")

def create_job_task(self, job_id, data_source_id, domain_id, task_options):
def create_job_task(
self,
resource_group: str,
job_id: str,
data_source_id: str,
workspace_id: str,
domain_id: str,
task_options: dict,
):
data = {
"resource_group:": resource_group,
"job_id": job_id,
"data_source_id": data_source_id,
"workspace_id": workspace_id,
"domain_id": domain_id,
"options": task_options,
}
Expand Down
24 changes: 11 additions & 13 deletions src/spaceone/cost_analysis/manager/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@


class PluginManager(BaseManager):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.plugin_connector: SpaceConnector = self.locator.get_connector('SpaceConnector', service='plugin',
token=config.get_global('TOKEN'))
self.plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, service="plugin", token=config.get_global("TOKEN")
)

def get_plugin_endpoint(self, plugin_info, domain_id):
plugin_connector: SpaceConnector = self.locator.get_connector('SpaceConnector', service='plugin',
token=config.get_global('TOKEN'))
response = plugin_connector.dispatch(
'Plugin.get_plugin_endpoint',
response = self.plugin_connector.dispatch(
"Plugin.get_plugin_endpoint",
{
'plugin_id': plugin_info['plugin_id'],
'version': plugin_info.get('version'),
'upgrade_mode': plugin_info.get('upgrade_mode', 'AUTO'),
'domain_id': domain_id
}
"plugin_id": plugin_info["plugin_id"],
"version": plugin_info.get("version"),
"upgrade_mode": plugin_info.get("upgrade_mode", "AUTO"),
"domain_id": domain_id,
},
)

return response['endpoint'], response.get('updated_version')
return response["endpoint"], response.get("updated_version")
5 changes: 3 additions & 2 deletions src/spaceone/cost_analysis/service/cost_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ def create(self, params):
'tags': 'dict',
'additional_info': 'dict',
'service_account_id': 'str',
'project_id': 'str',
'project_id': 'str', # injected from auth
'data_source_id': 'str',
'billed_date': 'str',
'domain_id': 'str'
'workspace_id': 'str', # injected from auth
'domain_id': 'str' # injected from auth
}
Returns:
Expand Down
4 changes: 2 additions & 2 deletions src/spaceone/cost_analysis/service/data_source_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,13 @@ def _verify_plugin(self, endpoint, plugin_info, domain_id):
options = plugin_info.get("options", {})
secret_id = plugin_info.get("secret_id")
secret_data = plugin_info.get("secret_data")
schema = plugin_info.get("schema")
schema_id = plugin_info.get("schema_id")

if not secret_data:
secret_data = self._get_secret_data(secret_id, domain_id)

self.ds_plugin_mgr.initialize(endpoint)
self.ds_plugin_mgr.verify_plugin(options, secret_data, schema, domain_id)
self.ds_plugin_mgr.verify_plugin(options, secret_data, schema_id, domain_id)

def _get_secret_data(self, secret_id, domain_id):
secret_mgr: SecretManager = self.locator.get_manager("SecretManager")
Expand Down
33 changes: 18 additions & 15 deletions src/spaceone/cost_analysis/service/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def get_cost_data(self, params):

try:
options = plugin_info.get("options", {})
schema = plugin_info.get("schema")
schema_id = plugin_info.get("schema_id")
tag_keys = data_source_vo.cost_tag_keys
additional_info_keys = data_source_vo.cost_additional_info_keys
data_keys = data_source_vo.cost_data_keys
Expand Down Expand Up @@ -248,7 +248,7 @@ def get_cost_data(self, params):
is_canceled = False
_LOGGER.debug(f"[get_cost_data] start job ({job_task_id}): {start_dt}")
for costs_data in self.ds_plugin_mgr.get_cost_data(
options, secret_data, schema, task_options, domain_id
options, secret_data, schema_id, task_options, domain_id
):
results = costs_data.get("results", [])
for cost_data in results:
Expand Down Expand Up @@ -295,20 +295,16 @@ def create_cost_job(self, data_source_vo: DataSource, job_options):
tasks = []
changed = []

data_source_id = data_source_vo.data_source_id
resource_group = data_source_vo.resource_group
data_source_id = data_source_vo.data_source_id
workspace_id = data_source_vo.workspace_id
domain_id = data_source_vo.domain_id

if resource_group == "WORKSPACE":
workspace_id = data_source_vo.workspace_id
else:
workspace_id = "*"

endpoint = self.ds_plugin_mgr.get_data_source_plugin_endpoint_by_vo(
data_source_vo
)
options = data_source_vo.plugin_info.options
schema = data_source_vo.plugin_info.schema
schema_id = data_source_vo.plugin_info.schema_id

if data_source_vo.secret_type:
secret_type = data_source_vo.secret_type
Expand All @@ -335,7 +331,7 @@ def create_cost_job(self, data_source_vo: DataSource, job_options):
options,
secret_id,
secret_data,
schema,
schema_id,
start,
last_synchronized_at,
domain_id,
Expand Down Expand Up @@ -376,7 +372,12 @@ def create_cost_job(self, data_source_vo: DataSource, job_options):
task_options = task["task_options"]
try:
job_task_vo = self.job_task_mgr.create_job_task(
job_vo.job_id, data_source_id, domain_id, task_options
job_vo.resource_group,
job_vo.job_id,
data_source_id,
job_vo.workspace_id,
domain_id,
task_options,
)
self.job_task_mgr.push_job_task(
{
Expand Down Expand Up @@ -442,8 +443,8 @@ def _set_secret_filter(secret_filter, provider):
{"k": "secret_id", "v": secret_filter["secrets"], "o": "in"}
)
if (
"service_accounts" in secret_filter
and secret_filter["service_accounts"]
"service_accounts" in secret_filter
and secret_filter["service_accounts"]
):
_filter.append(
{
Expand Down Expand Up @@ -532,6 +533,8 @@ def _create_cost_data(self, cost_data, job_task_vo, cost_options):

if "project_id" in cost_options:
cost_data["project_id"] = cost_options["project_id"]
else:
project_id = "*"

self.cost_mgr.create_cost(cost_data, execute_rollback=False)

Expand Down Expand Up @@ -727,7 +730,7 @@ def _aggregate_cost_data(self, job_vo: Job):

for job_task_id in job_task_ids:
for billed_month in self._distinct_billed_month(
data_source_id, domain_id, job_id, job_task_id
data_source_id, domain_id, job_id, job_task_id
):
self._aggregate_monthly_cost_data(
data_source_id, domain_id, job_id, job_task_id, billed_month
Expand All @@ -753,7 +756,7 @@ def _distinct_billed_month(self, data_source_id, domain_id, job_id, job_task_id)
return values

def _aggregate_monthly_cost_data(
self, data_source_id, domain_id, job_id, job_task_id, billed_month
self, data_source_id, domain_id, job_id, job_task_id, billed_month
):
query = {
"group_by": [
Expand Down

0 comments on commit 6dd5531

Please sign in to comment.