From c2c6290f6273e56b4b60c71d439ecd1cf90b9e9c Mon Sep 17 00:00:00 2001 From: Ding-Yi Chen Date: Wed, 21 Mar 2018 10:14:32 +1000 Subject: [PATCH] feat(ZNTA-1964): Deploy war to server --- JenkinsHelper.py | 316 +++++++++++++++++++++++++++++++++++++++++ ZanataArgParser.py | 174 +++++++++++++++++++++++ ZanataFunctions.py | 137 +++++++++++------- ZanataServer.py | 135 ++++++++++++++++++ py-test-all | 7 +- testZanataArgParser.py | 108 ++++++++++++++ testZanataFunctions.py | 4 + 7 files changed, 827 insertions(+), 54 deletions(-) create mode 100755 JenkinsHelper.py create mode 100755 ZanataArgParser.py create mode 100755 ZanataServer.py create mode 100755 testZanataArgParser.py diff --git a/JenkinsHelper.py b/JenkinsHelper.py new file mode 100755 index 0000000..cc5edb8 --- /dev/null +++ b/JenkinsHelper.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python +"""Jenkins Helper functions +It contains jenkins helper +Run JenkinsHelper --help or JenkinsHelper --help for +detail help.""" + +from __future__ import ( + absolute_import, division, print_function, unicode_literals) +import ast +import logging +import os +import os.path +import re +import sys + +try: + from typing import List, Any # noqa: F401 # pylint: disable=unused-import +except ImportError: + sys.stderr.write("python typing module is not installed" + os.linesep) + +from ZanataFunctions import UrlHelper +from ZanataArgParser import ZanataArgParser # pylint: disable=E0401 + + +try: + # We need to import 'List' and 'Any' for mypy to work + from typing import List, Any # noqa: E501,F401,F811 # pylint: disable=unused-import +except ImportError: + sys.stderr.write("python typing module is not installed" + os.linesep) + + +class JenkinsServer(object): + """JenkinsServer can connect to a Jenkins server""" + def __init__(self, server_url, user, token): + # type: (str, str,str) -> None + self.url_helper = UrlHelper( + server_url, user, token) + self.server_url = server_url + self.user = user + self.token = token + + @classmethod + def add_parser(cls, arg_parser=None, env_sub_commands=None): + # type: (ZanataArgParser) -> ZanataArgParser + """Add JenkinsServer parameters to a parser""" + if not arg_parser: + arg_parser = ZanataArgParser(description=__doc__) + + # Add env + arg_parser.add_env( + 'JENKINS_URL', dest='server_url', required=True, + sub_commands=env_sub_commands) + arg_parser.add_env( + 'ZANATA_JENKINS_USER', dest='user', required=True, + sub_commands=env_sub_commands) + arg_parser.add_env( + 'ZANATA_JENKINS_TOKEN', dest='token', required=True, + sub_commands=env_sub_commands) + return arg_parser + + @classmethod + def init_from_parsed_args(cls, args): + """New an instance from parsed args""" + return cls(args.server_url, args.user, args.token) + + +class JenkinsJob(object): + """JenkinsJob object can access a Jenkins Job""" + + @staticmethod + def dict_get_elem_by_path(dic, path): + # type (dict, str) -> object + """Return the elem in python dictionary given path + for example: you can use a/b to retrieve answer from following + dict: + { 'a': { 'b': 'answer' }}""" + obj = dic + for key in path.split('/'): + if obj[key]: + obj = obj[key] + else: + return None + return obj + + @staticmethod + def print_key_value(key, value): + # type (str, str) -> None + """Pretty print the key and value""" + return "%30s : %s" % (key, value) + + def get_elem(self, path): + # type: (str) -> object + """Get element from the job object""" + return JenkinsJob.dict_get_elem_by_path(self.content, path) + + def __repr__(self): + # type: () -> str + result = "\n".join([ + JenkinsJob.print_key_value(tup[0], tup[1]) for tup in [ + ['job_name', self.job_name], + ['folder', self.folder], + ['branch', self.branch]]]) + if self.content: + result += "\n\n%s" % "\n".join([ + JenkinsJob.print_key_value( + key, self.get_elem(key)) for key in [ + 'displayName', + 'fullName', + 'lastBuild/number', + 'lastCompletedBuild/number', + 'lastFailedBuild/number', + 'lastSuccessfulBuild/number']]) + return result + + def __init__(self, server, job_name, folder='', branch=''): + # type (JenkinsServer, str, str, str) -> None + self.server = server + self.job_name = job_name + self.folder = folder + self.branch = branch + self.content = None + job_path = "job/%s" % self.job_name + if folder: + job_path = "job/%s/%s" % (folder, job_path) + if branch: + job_path += "/job/%s" % branch + self.url = "%s%s" % (self.server.server_url, job_path) + + @classmethod + def add_parser( + cls, arg_parser=None, + only_options=False, env_sub_commands=None): + # type: (ZanataArgParser, bool) -> ZanataArgParser + """Add JenkinsJob parameters to parser + arg_parser: existing parser to be appended to + only_options: Add only options and JenkinsServer env""" + if not arg_parser or not arg_parser.has_env('JENKINS_URL'): + arg_parser = JenkinsServer.add_parser( + arg_parser, env_sub_commands) + arg_parser.add_common_argument( + '-b', '--branch', type=str, + help='branch or PR name') + arg_parser.add_common_argument( + '-F', '--folder', type=str, + help='GitHub Organization Folder') + if not only_options: + arg_parser.add_common_argument( + 'job_name', type=str, help='job name') + + # Add sub commands + arg_parser.add_sub_command( + 'get-job', None, + help='Show job objects') + arg_parser.add_sub_command( + 'get-last-successful-build', None, + help=cls.get_last_successful_build.__doc__) + arg_parser.add_sub_command( + 'get-last-successful-artifacts', + { + '-p --artifact-path-patterns': { + 'type': str, 'default': '.*', + 'help': 'comma split artifact path regex pattern'}}, # noqa: E501, #pylint: disable=line-too-long + help='Get matching last-successful artifacts. Default: .*') + arg_parser.add_sub_command( + 'download-last-successful-artifacts', + { + '-p --artifact-path-patterns': { + 'type': str, 'default': '.*', + 'help': 'comma split artifact path regex' + }, + '-d --download-dir': { + 'type': str, 'default': '.', + 'help': 'Download directory'} + }, + help='Get matching last-successful artifacts. Default: .*') + return arg_parser + + @classmethod + def init_from_parsed_args(cls, args): + """New an instance from parsed args""" + server = JenkinsServer.init_from_parsed_args(args) + kwargs = {'job_name': args.job_name} + for k in ['folder', 'branch']: + if hasattr(args, k): + kwargs[k] = getattr(args, k) + return cls(server, **kwargs) + + def load(self): + # type: () -> None + """Load the build object from Jenkins server""" + logging.debug("Loading job from %s/api/python", self.url) + self.content = ast.literal_eval(UrlHelper.read( + "%s/api/python" % self.url)) + + def get_last_successful_build(self): + # type: () -> JenkinsJobBuild + """Get last successful build""" + if not self.content: + self.load() + + if not self.content: + raise AssertionError("Failed to load job from %s" % self.url) + return JenkinsJobBuild( + self, + int(self.get_elem('lastSuccessfulBuild/number')), + self.get_elem('lastSuccessfulBuild/url')) + + def get_last_successful_artifacts( + self, artifact_path_patterns=None): + # type: (List[str]) -> List[str] + """Get last successful artifacts that matches patterns""" + build = self.get_last_successful_build() + return build.list_artifacts_related_paths(artifact_path_patterns) + + def download_last_successful_artifacts( + self, artifact_path_patterns=None, download_dir='.'): + # type: (List[str])-> List[str] + """Download last successful artifacts that matches patterns. + Returns related path of artifacts + + Note the directory structure will be flattern.""" + if not artifact_path_patterns: + artifact_path_patterns = ['.*'] + build = self.get_last_successful_build() + artifact_path_list = build.list_artifacts_related_paths( + artifact_path_patterns) + for artifact_path in artifact_path_list: + UrlHelper.download_file( + build.url + 'artifact/' + artifact_path, + download_dir=download_dir) + return artifact_path_list + + +class JenkinsJobBuild(object): + """Build object for Jenkins job""" + + def __init__(self, parent_job, build_number, build_url): + # type (object, int, str) -> None + self.parent_job = parent_job + self.number = build_number + self.url = build_url + self.content = None + + def get_elem(self, path): + # type: (str) -> object + """Get element from the build object""" + return JenkinsJob.dict_get_elem_by_path(self.content, path) + + def load(self): + """Load the build object from Jenkins server""" + logging.debug("Loading build from %sapi/python", self.url) + self.content = ast.literal_eval(UrlHelper.read( + "%s/api/python" % self.url)) + + def list_artifacts_related_paths(self, artifact_path_patterns=None): + # type: (str) -> List[str] + """Return a List of relativePaths of artifacts + that matches the path patterns""" + if not artifact_path_patterns: + artifact_path_patterns = ['.*'] + if not self.content: + self.load() + if not self.content: + raise AssertionError("Failed to load build from %s" % self.url) + result = [] + for artifact in self.content['artifacts']: + for pattern in artifact_path_patterns: + if re.search(pattern, artifact['relativePath']): + result.append(artifact['relativePath']) + break # Only append once + return result + + def __repr__(self): + # type: () -> str + result = "\n".join([ + JenkinsJob.print_key_value( + tup[0], str(tup[1])) for tup in [ + ['number', self.number], + ['url', self.url]]]) + + if self.content: + result += "\n\n%s" % "\n".join([ + JenkinsJob.print_key_value( + key, self.get_elem(key)) for key in [ + 'nextBuild/number', + 'previousBuild/number']]) + result += "\n\nArtifacts:\n%s" % "\n ".join( + self.list_artifacts_related_paths()) + return result + + +def run_sub_command(args): + # type (ZanataArgParser.Namespace) -> None + """Run the sub command""" + job = JenkinsJob.init_from_parsed_args(args) + job.load() + if args.sub_command == 'get-job': + print(job) + elif args.sub_command == 'get-last-successful-build': + build = job.get_last_successful_build() + build.load() + print(build) + elif args.sub_command == 'get-last-successful-artifacts': + print('\n'.join( + job.get_last_successful_artifacts( + args.artifact_path_patterns.split(',')) + )) + elif args.sub_command == 'download-last-successful-artifacts': + artifact_path_list = job.download_last_successful_artifacts( + args.artifact_path_patterns.split(','), + args.download_dir) + print("Downloaded files %s" % '\n'.join(artifact_path_list)) + + +if __name__ == '__main__': + run_sub_command(JenkinsJob.add_parser().parse_all()) diff --git a/ZanataArgParser.py b/ZanataArgParser.py new file mode 100755 index 0000000..5266ae0 --- /dev/null +++ b/ZanataArgParser.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +"""ZanataArgParser is an sub-class of ArgumentParser +that handles sub-parser, environments more easily.""" + +from __future__ import ( + absolute_import, division, print_function) + +from argparse import ArgumentParser, ArgumentError +import logging +import os + + +class ZanataArgParser(ArgumentParser): + """Zanata Argument Parser""" + def __init__(self, *args, **kwargs): + # type: (str, object, object) -> None + super(ZanataArgParser, self).__init__(*args, **kwargs) + self.sub_parsers = None + self.env_def = {} + self.parent_parser = ArgumentParser(add_help=False) + self.add_argument( + '-v', '--verbose', type=str, default='INFO', + metavar='VERBOSE_LEVEL', + help='Valid values: %s' + % 'DEBUG, INFO, WARNING, ERROR, CRITICAL, NONE') + + def add_common_argument(self, *args, **kwargs): + # type: (str, object, object) -> argparse + """Add a common argument that will be used in all sub commands + In other words, common argument wil be put in common parser. + Note that add_common_argument must be put in then front of + add_sub_command that uses common arguments.""" + self.parent_parser.add_argument(*args, **kwargs) + + def add_sub_command(self, name, arguments, **kwargs): + # type: (str, object, object) -> argparse ArgumentParser + """Add a sub command""" + if not self.sub_parsers: + self.sub_parsers = self.add_subparsers( + title='Command', description='Valid commands', + help='Command help') + + if 'parents' in kwargs: + kwargs['parents'] += [self.parent_parser] + else: + kwargs['parents'] = [self.parent_parser] + + anonymous_parser = self.sub_parsers.add_parser( + name, **kwargs) + if arguments: + for k, v in arguments.iteritems(): + anonymous_parser.add_argument(*k.split(), **v) + anonymous_parser.set_defaults(sub_command=name) + return anonymous_parser + + def add_env( # pylint: disable=too-many-arguments + self, env_name, + default=None, + required=False, + value_type=str, + dest=None, + sub_commands=None): + # type: (str, object, bool, type, str, List[str]) -> None + """Add environment variable + env_name: Environment variable name + default: Default value + value_type: type of value e.g. str + dest: attribute name to be return by parse_* + sub_commands: List of subcommands that use this environment""" + if not dest: + dest = env_name.lower() + if env_name in self.env_def: + raise ArgumentError( + None, "Duplicate environment name %s" % env_name) + self.env_def[env_name] = { + 'default': default, + 'required': required, + 'value_type': value_type, + 'dest': dest, + 'sub_commands': sub_commands} + + def has_common_argument(self, option_string=None, dest=None): + # type: (str, str) -> bool + """Has the parser defined this argument as a common argument? + Either specify option_string or dest + option_string: option in command line. e.g. -i + dest: attribute name to be return by parse_*""" + for action in self.parent_parser._actions: # pylint: disable=W0212 + if option_string: + if option_string in action.option_strings: + return True + else: + continue + elif dest: + if dest == action.dest: + return True + else: + continue + else: + raise ArgumentError(None, "need either option_string or dest") + return False + + def has_env(self, env_name): + # type: (str) -> bool + """Whether this parser parses this environment""" + return env_name in self.env_def + + def parse_args(self, args=None, namespace=None): + # type: (str, List, object) -> argparse.Namespace + """Parse arguments""" + result = super(ZanataArgParser, self).parse_args(args, namespace) + logging.basicConfig( + format='%(asctime)-15s [%(levelname)s] %(message)s') + logger = logging.getLogger() + if result.verbose == 'NONE': + # Not showing any log + logger.setLevel(logging.CRITICAL + 1) + elif hasattr(logging, result.verbose): + logger.setLevel(getattr(logging, result.verbose)) + else: + ArgumentError(None, "Invalid verbose level: %s" % result.verbose) + delattr(result, 'verbose') + return result + + @staticmethod + def _is_env_valid(env_name, env_value, env_data, args): + # type (str, str, dict, argparse.Namespace) -> bool + """The invalid env should be skipped or raise error""" + # Skip when the env is NOT in the list of supported sub-commands + if env_data['sub_commands'] and args and hasattr(args, 'sub_command'): + if args.sub_command not in env_data['sub_commands']: + return False + + # Check whether the env_value is valid + if not env_value: + if env_data['required']: + # missing required value + raise AssertionError("Missing environment '%s'" % env_name) + elif not env_data['default']: + # no default value + return False + return True + + def parse_env(self, args=None): + # type: (argparse.Namespace) -> dict + """Parse environment""" + result = {} + for env_name in self.env_def: + env_data = self.env_def[env_name] + env_value = os.environ.get(env_name) + try: + if not ZanataArgParser._is_env_valid( + env_name, env_value, env_data, args): + continue + except AssertionError as e: + raise e + if not env_value: + if env_data['required']: + raise AssertionError("Missing environment '%s'" % env_name) + elif not env_data['default']: + continue + else: + env_value = env_data['default'] + result[env_data['dest']] = env_value + return result + + def parse_all(self, args=None, namespace=None): + # type: (str, List, object) -> argparse.Namespace + """Parse arguments and environment""" + result = self.parse_args(args, namespace) + env_dict = self.parse_env(result) + for k, v in env_dict.iteritems(): # pylint: disable=no-member + setattr(result, k, v) + return result diff --git a/ZanataFunctions.py b/ZanataFunctions.py index e51d1f3..adf2bd2 100755 --- a/ZanataFunctions.py +++ b/ZanataFunctions.py @@ -1,14 +1,18 @@ #!/usr/bin/env python """Generic Helper Function""" -import argparse +from __future__ import ( + absolute_import, division, print_function, unicode_literals) + +import codecs import errno import logging import os import subprocess # nosec import sys -import urllib2 # noqa: F401 # pylint: disable=import-error,unused-import -import urlparse # noqa: F401 # pylint: disable=import-error,unused-import +import urllib2 # noqa: F401 # pylint: disable=import-error +import urlparse # noqa: F401 # pylint: disable=import-error +from ZanataArgParser import ZanataArgParser # pylint: disable=import-error try: from typing import List, Any # noqa: F401 # pylint: disable=unused-import @@ -21,15 +25,6 @@ BASH_CMD = '/bin/bash' -def logging_init(level=logging.INFO): - # type (int) -> logging.Logger - """Initialize logging""" - logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s') - logger = logging.getLogger() - logger.setLevel(level) - return logger - - def read_env(filename): # type (str) -> dict """Read environment variables by sourcing a bash file""" @@ -38,8 +33,9 @@ def read_env(filename): "source %s && set -o posix && set" % (filename)], stdout=subprocess.PIPE) return {kv[0]: kv[1] for kv in [ - s.strip().split('=', 1) - for s in proc.stdout.readlines() if '=' in s]} + u.strip().split('=', 1) + for u in codecs.getreader('utf8')(proc.stdout).readlines() + if '=' in u]} ZANATA_ENV = read_env(ZANATA_ENV_FILE) @@ -47,13 +43,15 @@ def read_env(filename): class HTTPBasicAuthHandler(urllib2.HTTPBasicAuthHandler): """Handle Basic Authentication""" - def http_error_401(self, req, fp, code, msg, headers): # noqa: E501 # pylint: disable=invalid-name,unused-argument,too-many-arguments + def http_error_401( # pylint: disable=too-many-arguments,unused-argument + self, req, fp, code, msg, headers): """retry with basic auth when facing a 401""" host = req.get_host() realm = None return self.retry_http_basic_auth(host, req, realm) - def http_error_403(self, req, fp, code, msg, hdrs): # noqa: E501 # pylint: disable=invalid-name,unused-argument,too-many-arguments + def http_error_403( # pylint: disable=too-many-arguments,unused-argument + self, req, fp, code, msg, hdrs): """retry with basic auth when facing a 403""" host = req.get_host() realm = None @@ -66,58 +64,94 @@ class SshHost(object): SCP_CMD = '/usr/bin/scp' SSH_CMD = '/usr/bin/ssh' - @staticmethod - def create_parent_parser(): - # type () -> argparse.ArgumentParser - """Create parent parser for SSH related program""" - parent_parser = argparse.ArgumentParser(add_help=False) - parent_parser.add_argument( - '-i', '--identity-file', type=str, - help='SSH/SCP indent-files') - parent_parser.add_argument( - 'host', type=str, - help='host with/without username,' - + ' e.g. user@host.example or host.example') - parent_parser.add_argument( - '-t', '--dest-path', type=str, help='Destination path') - return parent_parser - - def __init__(self, host, identity_file=None): - # type (str, str) -> None + def __init__(self, host, ssh_user=None, identity_file=None): + # type (str, str, str) -> None self.host = host + self.ssh_user = ssh_user self.identity_file = identity_file if self.identity_file: self.opt_list = ['-i', identity_file] else: self.opt_list = [] - def run_check_call(self, command, sudo=False): - # type (str, bool) -> None - """Run command though ssh""" + @classmethod + def add_parser(cls, arg_parser=None): + # type (ZanataArgParser) -> ZanataArgParser + """Add SshHost parameters to a parser""" + if not arg_parser: + arg_parser = ZanataArgParser( + description=cls.__doc__) + arg_parser.add_common_argument( + '-u', '--ssh-user', type=str, + help='Connect SSH/SCP as this user') + arg_parser.add_common_argument( + '-i', '--identity-file', type=str, + help='SSH/SCP ident-files') + arg_parser.add_common_argument( + 'host', type=str, + help='host name') + return arg_parser + + @classmethod + def init_from_parsed_args(cls, args): + """Init from command line arguments""" + kwargs = {'host': args.host} + for k in ['ssh_user', 'identitity_file']: + if hasattr(args, k): + kwargs[k] = getattr(args, k) + return cls(**kwargs) + + def _get_user_host(self): + # type () -> str + """Produce [user@]host""" + return "%s%s" % ( + '' if not self.ssh_user else self.ssh_user + '@', self.host) + + def _run_check(self, command, sudo): + # type (str, bool) -> List[str] + """Return cmd_list""" cmd_list = [SshHost.SSH_CMD] cmd_list += self.opt_list - cmd_list += [ - self.host, - ('sudo ' if sudo else '') + command] - logging.info(' '.join(cmd_list)) + cmd_list += [self._get_user_host()] + cmd_list += [('sudo ' if sudo else '') + command] + logging.debug(' '.join(cmd_list)) + return cmd_list + def run_check_call(self, command, sudo=False): + # type (str, bool) -> None + """Run command though ssh""" + cmd_list = self._run_check(command, sudo) subprocess.check_call(cmd_list) # nosec - def scp_to_remote( + def run_check_output(self, command, sudo=False): + # type (str, bool) -> str + """Run command though ssh, return stdout""" + cmd_list = self._run_check(command, sudo) + return subprocess.check_output(cmd_list) # nosec + + def run_chown(self, user, group, filename, options=None): + # type (str, bool) -> None + """Run command though ssh""" + self.run_check_call( + "chown %s %s:%s %s" % ( + '' if not options else ' '.join(options), + user, group, filename), + True) + + def scp_to_host( self, source_path, dest_path, sudo=False, rm_old=False): # type (str, str, bool, bool) -> None - """scp to remote host""" + """scp to host""" if rm_old: self.run_check_call( "rm -fr %s" % dest_path, sudo) - cmd_list = [ - 'scp', '-p'] + self.opt_list + [ - source_path, - "%s:%s" % (self.host, dest_path)] + cmd_list = ['scp', '-p'] + self.opt_list + [ + source_path, + "%s:%s" % (self._get_user_host(), dest_path)] - logging.info(' '.join(cmd_list)) + logging.debug(' '.join(cmd_list)) subprocess.check_call(cmd_list) # nosec @@ -141,11 +175,11 @@ def __init__(self, base_url, user, token): def read(url): # type (str) -> str """Read URL""" - logging.info("Reading from %s", url) + logging.debug("Reading from %s", url) return urllib2.urlopen(url).read() # nosec @staticmethod - def download_file(url, dest_file='', dest_dir='.'): + def download_file(url, dest_file='', download_dir='.'): # type (str, str, str) -> None """Download file""" target_file = dest_file @@ -153,12 +187,13 @@ def download_file(url, dest_file='', dest_dir='.'): url_parsed = urlparse.urlparse(url) target_file = os.path.basename(url_parsed.path) chunk = 128 * 1024 # 128 KiB - target_dir = os.path.abspath(dest_dir) + target_dir = os.path.abspath(download_dir) target_path = os.path.join(target_dir, target_file) try: os.makedirs(target_dir) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(target_dir): + # Dir already exists pass else: raise diff --git a/ZanataServer.py b/ZanataServer.py new file mode 100755 index 0000000..e6f1785 --- /dev/null +++ b/ZanataServer.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +"""Zanata Server Helper functions""" + +from __future__ import ( + absolute_import, division, print_function, unicode_literals) +import logging +import os +import os.path +import sys + + +from ZanataArgParser import ZanataArgParser # pylint: disable=E0401 +from ZanataFunctions import SshHost +from JenkinsHelper import JenkinsServer, JenkinsJob + +try: + # We need to import 'List' and 'Any' for mypy to work + from typing import List, Any # noqa: F401 # pylint: disable=unused-import +except ImportError: + sys.stderr.write("python typing module is not installed" + os.linesep) + + +class ZanataServer(SshHost): + """zanata.Server Class""" + def __init__( + self, host, ssh_user=None, identity=None, + deploy_dir='/var/opt/rh/eap7/lib/wildfly/standalone/deployments'): + # type (str, str, str) -> None + """New an instance + src_url: Source URL of zanata WAR file + local_war_path: The local path to the WAR file""" + super(ZanataServer, self).__init__(host, ssh_user, identity) + self.deploy_dir = deploy_dir + self.jboss_user = 'jboss' + self.jboss_group = 'jboss' + + @classmethod + def add_parser(cls, arg_parser=None): + # type (ZanataArgParser) -> ZanataArgParser + """Add Zanata Server parameters to a parser""" + if not arg_parser: + arg_parser = ZanataArgParser(description=__doc__) + arg_parser = super(ZanataServer, cls).add_parser(arg_parser) + + arg_parser.add_sub_command( + 'deploy-war-file', + { + 'war_file': { + 'type': str, + 'help': 'WAR file'} + }, + help=cls.deploy_war_file.__doc__) + + # Include JenkinsJob parser for following sub command + if not arg_parser.has_common_argument(dest='branch'): + arg_parser = JenkinsJob.add_parser( + arg_parser, True, ['deploy-from-jenkins-last-successful']) + arg_parser.add_sub_command( + 'deploy-from-jenkins-last-successful', + None, + help=cls.deploy_from_jenkins_last_successful.__doc__) + return arg_parser + + @staticmethod + def download_last_successful_war( + jenkins_server, branch, + folder, download_dir='.'): + # type (JenkinsServer, str, str) -> str + """Dowload last successful war from jenkins""" + logging.debug( + "download_last_successful_war_from_jenkins(%s, %s, %s)", + jenkins_server.server_url, branch, folder) + jenkins_job = JenkinsJob( + jenkins_server, 'zanata-platform', folder=folder, + branch=branch) + jenkins_job.load() + war_file_list = jenkins_job.download_last_successful_artifacts( + [r'zanata-war/.*/zanata.*\.war'], + download_dir) + return download_dir + '/' + os.path.basename(war_file_list[0]) + + def deploy_war_file( + self, war_file, rm_old=True, + scp_dest_dir='/usr/local/share/applications'): + # type (str, bool, str) -> None + """scp WAR file to server""" + dest_war = "%s/zanata.war" % scp_dest_dir + tmp_dest_war = dest_war + '.tmp' + + self.scp_to_host( + war_file, tmp_dest_war, sudo=True, rm_old=rm_old) + self.run_chown(self.jboss_user, self.jboss_group, tmp_dest_war) + + # Link war file to dest + self.run_check_call("systemctl stop eap7-standalone", True) + # mv to dest_war after eap7 is stopped + self.run_check_call("mv -f %s %s" % (tmp_dest_war, dest_war), True) + deploy_war_file = "%s/zanata.war" % self.deploy_dir + self.run_check_call( + "ln -sf %s %s" % (dest_war, deploy_war_file), True) + self.run_chown(self.jboss_user, self.jboss_group, deploy_war_file) + self.run_check_call("systemctl start eap7-standalone", True) + + logging.info("Done") + + def deploy_from_jenkins_last_successful( + self, jenkins_server, + branch='master', + folder='github-zanata-org', rm_old=True): + # type () -> None + """Download from last successful WAR file from jenkins, then deploy""" + downloaded_war = ZanataServer.download_last_successful_war( + jenkins_server, branch, folder) + self.deploy_war_file(downloaded_war, rm_old) + + +def run_sub_command(args): + # type (dict) -> None + """Run the sub command""" + z_server = ZanataServer.init_from_parsed_args(args) + + if args.sub_command == 'deploy-war-file': + z_server.deploy_war_file(args.war_file) + elif args.sub_command == 'deploy-from-jenkins-last-successful': + jenkins_server = JenkinsServer.init_from_parsed_args(args) + kwargs = {} + for key in ['branch', 'folder']: + if hasattr(args, key) and getattr(args, key): + kwargs[key] = getattr(args, key) + z_server.deploy_from_jenkins_last_successful( + jenkins_server, **kwargs) + + +if __name__ == '__main__': + run_sub_command(ZanataServer.add_parser().parse_all()) diff --git a/py-test-all b/py-test-all index 91fab31..78f5d6b 100755 --- a/py-test-all +++ b/py-test-all @@ -6,6 +6,10 @@ set -eu PY_SOURCES=*.py +## pytest +echo "====== pytest-2 ======" > /dev/stderr +pytest-2 *.py + ## Pylint echo "====== pylint ======" > /dev/stderr python2 -m pylint $PY_SOURCES @@ -14,7 +18,4 @@ python2 -m pylint $PY_SOURCES echo "====== flake8 =====" > /dev/stderr flake8 --benchmark $PY_SOURCES -## pytest -echo "====== pytest-2 ======" > /dev/stderr -pytest-2 $PY_SOURCES diff --git a/testZanataArgParser.py b/testZanataArgParser.py new file mode 100755 index 0000000..f96ced8 --- /dev/null +++ b/testZanataArgParser.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +"""Test the ZanataArgParser.py""" + +from __future__ import ( + absolute_import, division, print_function) + +import StringIO # pylint: disable=E0401 +import os +import sys +import unittest +import ZanataArgParser # pylint: disable=E0401 + + +def _convert_unicode_str(dictionary): + """Recursively converts dictionary keys to strings""" + if not isinstance(dictionary, dict): + if isinstance(dictionary, unicode): # NOQA # pylint: disable=E0602 + return str(dictionary) + return str(dictionary) + return dict( + (str(k), _convert_unicode_str(v)) + for k, v in dictionary.iteritems()) + + +class ZanataArgParserTestCase(unittest.TestCase): + """Test Case for ZanataArgParser.py""" + def setUp(self): + self.parser = ZanataArgParser.ZanataArgParser('parser-test') + self.parser.add_common_argument( + '-b', '--branch', type=str, default='', + help='branch or PR name') + self.parser.add_common_argument('job_name', type=str, help='job name') + self.parser.add_env( + 'HOME', default='str', required=True) + self.parser.add_env( + 'LOGNAME', default='str', required=False) + self.parser.add_sub_command( + 'show-job', None, help='Get Job objects') + + def _match_result( + self, method, expected_args, param_list, stdout_pattern=None): + captured_output = StringIO.StringIO() + sys.stdout = captured_output + args = getattr(self.parser, method)(param_list) + sys.stdout = sys.__stdout__ + if stdout_pattern: + self.assertRegexpMatches( # pylint: disable=W1505 + captured_output.getvalue(), stdout_pattern) + self.assertDictEqual( + _convert_unicode_str(expected_args), + _convert_unicode_str(args.__dict__)) + + def test_add_sub_command(self): + """Test add_sub_command""" + self.parser.add_sub_command( + 'show-last-successful-build', + {'-F --folder': { + 'type': str, 'default': '', + 'help': 'folder name'}}, + help='Get build objects') + + # Test Run + self._match_result( + 'parse_args', + { + 'branch': '', + 'job_name': 'zanata-platform', + 'sub_command': 'show-job'}, + ['show-job', 'zanata-platform']) + self._match_result( + 'parse_args', + { + 'branch': '', 'folder': 'github-zanata-org', + 'job_name': 'zanata-platform', + 'sub_command': 'show-last-successful-build'}, + [ + 'show-last-successful-build', + '-F', 'github-zanata-org', 'zanata-platform']) + + def test_env(self): + """Test parse_env""" + home = os.environ.get('HOME') + env_dict = self.parser.parse_env() + self.assertEqual(home, env_dict['home']) + + def test_parse_all(self): + """Test parse_all""" + log_name = os.environ.get('LOGNAME') + home = os.environ.get('HOME') + + self._match_result( + 'parse_all', + { + 'branch': 'release', + 'job_name': 'zanata-platform', + 'sub_command': 'show-job', + 'home': home, + 'logname': log_name}, + [ + 'show-job', + '-b', 'release', 'zanata-platform']) + + env_dict = self.parser.parse_env() + self.assertEqual(home, env_dict['home']) + + +if __name__ == '__main__': + unittest.main() diff --git a/testZanataFunctions.py b/testZanataFunctions.py index 8309e8c..a933e56 100755 --- a/testZanataFunctions.py +++ b/testZanataFunctions.py @@ -1,5 +1,9 @@ #!/usr/bin/env python """Test the ZanataFunctions""" + +from __future__ import ( + absolute_import, division, print_function, unicode_literals) + import subprocess # nosec import unittest import ZanataFunctions