Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/cimi deployment prototype #413

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
64c3ad5
wip
0xbase12 Aug 8, 2018
32ed932
wip
0xbase12 Aug 9, 2018
e8a27fc
good till sending reports and ss-get ss-set ss-display ss-abort comma…
0xbase12 Aug 15, 2018
d91435f
Update BaseCloudConnector.py
0xbase12 Aug 21, 2018
022bbac
Update NodeDecorator.py
0xbase12 Aug 21, 2018
a871773
Update NodeInstance.py
0xbase12 Aug 21, 2018
5d69845
Update NodeDecorator.py
0xbase12 Aug 21, 2018
c4d2117
Update NodeInstance.py
0xbase12 Aug 21, 2018
7c4694b
Update BaseCloudConnector.py
0xbase12 Aug 21, 2018
bf63c73
Update BaseCloudConnector.py
0xbase12 Aug 21, 2018
fafcd54
Update BaseCloudConnector.py
0xbase12 Aug 21, 2018
4810e2a
Update BaseCloudConnector.py
0xbase12 Aug 21, 2018
3d8b42e
Update UserInfo.py
0xbase12 Aug 21, 2018
faa38f6
wip
0xbase12 Aug 21, 2018
352f12b
fix check cimi deployment
0xbase12 Aug 21, 2018
9a202bf
Update BaseCloudConnector.py
0xbase12 Aug 22, 2018
9cfe171
Update NodeInstance.py
0xbase12 Aug 22, 2018
b904f16
Update NodeInstance.py
0xbase12 Aug 22, 2018
77b4730
merge with compatibily workaround for old and new deployment
0xbase12 Aug 22, 2018
ba00117
Merge branch 'cimi_deployment_protoype_compatible_workaround' into ta…
0xbase12 Aug 22, 2018
b18cc7f
fix complete state and fix set runtime parameter and get runtime para…
0xbase12 Aug 24, 2018
6eaf0a3
comment non used is_mutable and scale_operational
0xbase12 Aug 27, 2018
ba3d413
Merge branch 'master' into task/cimi-deployment-prototype
0xbase12 Sep 13, 2018
f40f0df
add ssh keys on executing
0xbase12 Sep 28, 2018
ea3d6a2
support cimi dep subtarget
0xbase12 Oct 10, 2018
e8267d0
Merge branch 'master' into task/cimi-deployment-prototype
0xbase12 Oct 10, 2018
907b114
remove recusive lookup, fix vm name at startup
0xbase12 Oct 29, 2018
aac2de5
minor
0xbase12 Oct 31, 2018
b2685b9
pubkey is a collection of keys in cimi deployment
0xbase12 Nov 8, 2018
1f0699f
onProvisioning
0xbase12 Nov 23, 2018
5282759
bootstrap direct command force to python 2 and minor
0xbase12 Dec 3, 2018
6532bc9
minor
0xbase12 Dec 3, 2018
8e8b9cd
force python2 for ss-client everywhere
0xbase12 Dec 4, 2018
4d494aa
set_cloud_node_ssh_url with port mapping when available
0xbase12 Dec 4, 2018
07910c2
do not wait for stderr when exit success
0xbase12 Dec 11, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions client/src/main/python/slipstream/Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import subprocess
from multiprocessing.dummy import Pool as ThreadPool


from SlipStreamHttpClient import SlipStreamHttpClient

from exceptions.Exceptions import NotYetSetException
Expand Down Expand Up @@ -90,6 +89,41 @@ def getRuntimeParameter(self, key):

return value

def kb_extract_param_name_node_name(self, key):
node_name = None
param_name = key
if NodeDecorator.NODE_PROPERTY_SEPARATOR in key:
if not key.startswith(NodeDecorator.globalNamespacePrefix):
param_split = key.split(NodeDecorator.NODE_PROPERTY_SEPARATOR)
node_name = param_split[0]
param_name = param_split[1]
else:
node_name = self._getNodeName()
return node_name, param_name

def kb_getRuntimeParameter(self, key):
node_name, param_name = self.kb_extract_param_name_node_name(key)
if self.no_block:
value = self.httpClient.kb_get_deployment_parameter(param_name, node_name)
else:
timer = 0
while True:
value = self.httpClient.kb_get_deployment_parameter(key, node_name)

if value is not None:
break
if self.timeout != 0 and timer >= self.timeout:
raise TimeoutException(
"Exceeded timeout limit of %s waiting for key '%s' "
"to be set" % (self.timeout, key))
print >> sys.stderr, "Waiting for %s" % key
sys.stdout.flush()
sleepTime = 5
time.sleep(sleepTime)
timer += sleepTime

return value

def launchDeployment(self, params):
"""
@return: Run location
Expand Down Expand Up @@ -139,10 +173,10 @@ def _qualifyKey(self, key):
# multiplicity parameter should NOT be qualified make an exception
if len(parts) == 1 and propertyPart not in node_level_properties:
_key = nodename + \
NodeDecorator.NODE_MULTIPLICITY_SEPARATOR + \
NodeDecorator.nodeMultiplicityStartIndex + \
NodeDecorator.NODE_PROPERTY_SEPARATOR + \
propertyPart
NodeDecorator.NODE_MULTIPLICITY_SEPARATOR + \
NodeDecorator.nodeMultiplicityStartIndex + \
NodeDecorator.NODE_PROPERTY_SEPARATOR + \
propertyPart
return _key

if _key not in node_level_properties:
Expand Down Expand Up @@ -176,13 +210,18 @@ def setRuntimeParameter(self, key, value):
raise ClientError("value exceeds maximum length of %d characters" % self.VALUE_LENGTH_LIMIT)
self.httpClient.setRuntimeParameter(_key, stripped_value)

def kb_setRuntimeParameter(self, key, value):
node_name, param_name = self.kb_extract_param_name_node_name(key)
self.httpClient.kb_set_deployment_parameter(param_name, util.removeASCIIEscape(value), node_name)

def cancel_abort(self):
# Global abort
self.httpClient.unset_runtime_parameter(NodeDecorator.globalNamespacePrefix + NodeDecorator.ABORT_KEY,
ignore_abort=True)
self.httpClient.kb_unset_deployment_parameter(NodeDecorator.globalNamespacePrefix + NodeDecorator.ABORT_KEY)

self.httpClient.kb_unset_deployment_parameter(NodeDecorator.ABORT_KEY, self._getNodeName())

_key = self._qualifyKey(NodeDecorator.ABORT_KEY)
self.httpClient.unset_runtime_parameter(_key, ignore_abort=True)
self.httpClient.kb_set_deployment_parameter(NodeDecorator.globalNamespacePrefix + NodeDecorator.STATE_KEY,
"Ready")

def executScript(self, script):
return self._systemCall(script, retry=False)
Expand Down Expand Up @@ -265,7 +304,7 @@ def get_rtp_all(self, compname, key):
nparams = len(params)
pool_size = min(POOL_MAX, nparams)
self._printDetail("Get %s RTP instances with pool size: %s" %
(nparams, pool_size))
(nparams, pool_size))
pool = ThreadPool(pool_size)
results = pool.map(self._get_rtp, params)
results = [v or '' for v in results]
Expand All @@ -277,4 +316,4 @@ def get_session(self):
return self.httpClient.get_session()

def get_api(self):
return self.httpClient.get_api()
return self.httpClient.get_api()
50 changes: 49 additions & 1 deletion client/src/main/python/slipstream/SlipStreamHttpClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import print_function

import os
import uuid
import json
from collections import defaultdict

Expand All @@ -38,6 +39,7 @@ class SlipStreamHttpClient(object):
def __init__(self, configHolder):
self.category = None
self.run_dom = None
self.kb_deployment = None
self.ignoreAbort = False
self.username = ''
self.password = ''
Expand Down Expand Up @@ -137,7 +139,7 @@ def get_nodes_instances(self, cloud_service_name=None):
'''
nodes_instances = {}

self._retrieveAndSetRun()
self._kb_retrieveAndSetRun()

nodes_instances_runtime_parameters = \
DomExtractor.extract_nodes_instances_runtime_parameters(self.run_dom, cloud_service_name)
Expand Down Expand Up @@ -168,6 +170,15 @@ def get_nodes_instances(self, cloud_service_name=None):

return nodes_instances

def kb_get_node_instance(self, node_name):
type = self.kb_get_run_type()
if type in ('COMPONENT', 'IMAGE'):
return self.kb_deployment['module']
elif type == 'APPLICATION': # FIXME more than the module returned
nodes = self.kb_deployment['module']['content']['nodes']
return filter(lambda d: d['node'] == node_name, nodes)[0]


def _get_nodename(self):
'Node name derived from the node instance name.'
return self.node_instance_name.split(
Expand All @@ -194,6 +205,17 @@ def _retrieveAndSetRun(self):
_, run = self._retrieve(url)
self.run_dom = etree.fromstring(run.encode('utf-8'))

def kb_get_run_type(self):
return self._kb_retrieveAndSetRun()['module']['type']

def kb_get_userparam_ssh_pubkeys(self):
return self._kb_retrieveAndSetRun().get('sshPublicKeys')

def _kb_retrieveAndSetRun(self):
if self.kb_deployment is None:
self.kb_deployment = self.api.cimi_get(self.diid).json
return self.kb_deployment

def _retrieve(self, url):
return self._httpGet(url, 'application/xml')

Expand All @@ -207,6 +229,9 @@ def complete_state(self, node_instance_name):
url += SlipStreamHttpClient.URL_IGNORE_ABORT_ATTRIBUTE_QUERY
return self._httpPost(url, 'reset', 'text/plain')

def kb_complete_state(self, state, node_instance_name):
return self.kb_set_deployment_parameter(NodeDecorator.COMPLETE_KEY, state, node_instance_name)

def terminate_run(self):
return self._httpDelete(self.run_url)

Expand Down Expand Up @@ -268,6 +293,29 @@ def getRuntimeParameter(self, key, ignoreAbort=False):

return content.strip().strip('"').strip("'")

@staticmethod
def kb_from_data_uuid(text):
class NullNameSpace:
bytes = b''

return str(uuid.uuid3(NullNameSpace, text))

def __contruct_deployment_param_href(self, node_id, param_name):
param_id = ':'.join(item or '' for item in [self.diid, node_id, param_name])
return 'deployment-parameter/' + self.kb_from_data_uuid(param_id)

def kb_get_deployment_parameter(self, param_name, node_id=None):
deployment_parameter_href = self.__contruct_deployment_param_href(node_id, param_name)
return self.api.cimi_get(deployment_parameter_href).json.get('value')

def kb_set_deployment_parameter(self, param_name, value, node_id=None):
deployment_parameter_href = self.__contruct_deployment_param_href(node_id, param_name)
return self.api.cimi_edit(deployment_parameter_href, {'value': value})

def kb_unset_deployment_parameter(self, param_name, node_id=None):
deployment_parameter_href = self.__contruct_deployment_param_href(node_id, param_name)
return self.api.cimi_edit(deployment_parameter_href, {}, select='value')

def setRuntimeParameter(self, key, value, ignoreAbort=False):
url = self.run_url + '/' + key
if self.ignoreAbort or ignoreAbort:
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/python/slipstream/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python2
# coding=latin-1
"""
SlipStream Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,17 @@ def __start_node_instance_and_client(self, user_info, node_instance):

self._print_detail("Starting instance: %s" % node_instance_name)

self.cimi_deployment_prototype = bool(node_instance.get_deployment_context())
node_context = node_instance.get_deployment_context()
self.cimi_deployment_prototype = bool(node_context)

if self.cimi_deployment_prototype:
vm_name = node_instance_name + '--' + node_context.get('SLIPSTREAM_DIID', '').replace('deployment/', '')
else:
vm_name = self._generate_vm_name(node_instance_name)

vm = self._start_image(user_info,
node_instance,
self._generate_vm_name(node_instance_name))
vm_name)

self.__add_vm(vm, node_instance)

Expand Down Expand Up @@ -533,6 +539,12 @@ def _publish_vm_info(self, vm, node_instance):
already_published.add('ssh')
if vm_ports_mapping and 'vm_ports_mapping' not in already_published:
node_instance.set_cloud_node_ports_mapping(vm_ports_mapping)
ssh_found = re.search('tcp:(\d+):22', str(vm_ports_mapping))
if ssh_found:
ssh_username, ssh_password = self.__get_vm_username_password(node_instance)
node_instance.set_cloud_node_ssh_url('ssh://{}@{}:{}'.format(ssh_username.strip(),
vm_ip.strip(),
ssh_found.group(1)))
already_published.add('vm_ports_mapping')
else:
if vm_id and 'id' not in already_published:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def _run_instance(self, node_instance):
verbose_level = self.get_option('verbose') and 3 or 0
ch = ConfigHolder(options={'verboseLevel': verbose_level,
'retry': False,
KEY_RUN_CATEGORY: RUN_CATEGORY_DEPLOYMENT},
KEY_RUN_CATEGORY: RUN_CATEGORY_DEPLOYMENT},
context={'foo': 'bar'},
config={'foo': 'bar'})

Expand Down
2 changes: 1 addition & 1 deletion client/src/main/python/slipstream/command/VMCommandBase.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python2
"""
SlipStream Client
=====
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python2
"""
SlipStream Client
=====
Expand Down
52 changes: 37 additions & 15 deletions client/src/main/python/slipstream/executors/MachineExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _execute(self):
state = self._get_state()
while True:
self._execute_state(state)
self._complete_state(state)
self._kb_complete_state(state)
state = self._wait_for_next_state(state)

def _get_state(self):
Expand Down Expand Up @@ -131,6 +131,10 @@ def _complete_state(self, state):
if self._need_to_complete(state):
self.wrapper.complete_state()

def _kb_complete_state(self, state):
if self._need_to_complete(state):
self.wrapper.kb_complete_state(state)

@staticmethod
def _failure_msg_from_exception(exception):
"""
Expand Down Expand Up @@ -177,7 +181,7 @@ def _get_sleep_time(self, state):
return self.WAIT_NEXT_STATE_SHORT

def _retrieve_my_node_instance(self):
node_instance = self.wrapper.get_my_node_instance()
node_instance = self.wrapper.kb_get_my_node_instance()
if node_instance is None:
raise ExecutionException("Couldn't get the node instance for the current VM.")
return node_instance
Expand All @@ -189,7 +193,7 @@ def _is_recovery_mode(self):
return self.recovery_mode == True

def _is_mutable(self):
return self.wrapper.is_mutable()
return False # self.wrapper.is_mutable()

def _need_to_complete(self, state):
return state not in ['Finalizing', 'Done', 'Cancelled', 'Aborted']
Expand All @@ -201,6 +205,10 @@ def _execute_execute_target(self):
self._execute_target('execute', abort_on_err=True)
self._set_need_to_send_reports()

def _kb_execute_execute_target(self):
self._kb_execute_target('deployment', abort_on_err=True)
self._set_need_to_send_reports()

def _execute_target(self, target_name, exports=None, abort_on_err=False, ssdisplay=True, ignore_abort=False):
target = self.node_instance.get_image_target(target_name)

Expand Down Expand Up @@ -237,6 +245,24 @@ def _execute_target(self, target_name, exports=None, abort_on_err=False, ssdispl
else:
util.printAndFlush('Nothing to do for script: %s' % full_target_name)

def _kb_get_target(self, target_name):
return self.node_instance['content'].get('targets', {}).get(target_name)

def _kb_execute_target(self, target_name, exports=None, abort_on_err=False, ssdisplay=True, ignore_abort=False):
target = self._kb_get_target(target_name)

if not target:
util.printAndFlush('Nothing to do for script: %s' % target_name)
return

full_target_name = '%s:%s' % (self.node_instance['name'], target_name)
message = "Executing script '%s'" % full_target_name
util.printStep(message)

fail_msg = "Failed running '%s' script on '%s'" % (target_name, self._get_node_instance_name())
for i, sub_t in enumerate(target):
self._launch_script(sub_t, exports, abort_on_err, ignore_abort, fail_msg, '{}[{}]'.format(target_name, i))

def _need_to_execute_build_step(self, target, subtarget):
return MachineExecutor.need_to_execute_build_step(self._get_node_instance(), target, subtarget)

Expand Down Expand Up @@ -340,10 +366,12 @@ def _run_target_script(self, target_script, exports=None, ignore_abort=False, na
util.printDetail("End of the script '%s'" % (_name,))

stderr_last_line = ''
try:
stderr_last_line = result.get(timeout=60)
except Empty:
pass

if process.returncode != self.SCRIPT_EXIT_SUCCESS:
try:
stderr_last_line = result.get(timeout=60)
except Empty:
pass
return process.returncode, stderr_last_line

def _write_target_script_to_file(self, target_script, name=None):
Expand Down Expand Up @@ -446,21 +474,15 @@ def onSendingReports(self):
def onReady(self):
util.printAction('Ready')

def onFinalizing(self):
util.printAction('Finalizing')

if self.wrapper.isAbort():
util.printError("Failed")
else:
util.printAction('Done!')

def onDone(self):
util.printAction('Done!')
self._abort_running_in_final_state()

def onCancelled(self):
self._abort_running_in_final_state()

def onAborted(self):
util.printError("Failed")
self._abort_running_in_final_state()

def _abort_running_in_final_state(self):
Expand Down
Loading