From cd59f3eee32fff7d844a24e534f43156a6f51761 Mon Sep 17 00:00:00 2001 From: Ding-Yi Chen Date: Wed, 21 Mar 2018 10:14:32 +1000 Subject: [PATCH] feat(ZNTA-1964): Deploy war to server --- JenkinsHelper.py | 272 +++++++++++++++++++++++++++++++++++++++++++++++ ZanataWar.py | 228 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 500 insertions(+) create mode 100755 JenkinsHelper.py create mode 100755 ZanataWar.py diff --git a/JenkinsHelper.py b/JenkinsHelper.py new file mode 100755 index 0000000..4402a6f --- /dev/null +++ b/JenkinsHelper.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python +"""Jenkins Helper functions +It contains jenkins helper +Run JenkinsHelper --help or JenkinsHelper --help for +detail help.""" + +import argparse +import ast +import logging +import os +import os.path +import re +import sys + +# from typing import List, Any # noqa: F401 # pylint: disable=unused-import +from ZanataFunctions import UrlHelper, logging_init + +try: + # We need to import 'List' and 'Any' for mypy to work + from typing import List, Any # noqa: F401 # pylint: disable=unused-import +except ImportError: + sys.stderr.write("python typing module is not installed" + os.linesep) + + +class JenkinsServer(object): + """Jenkins Helper functions""" + def __init__(self, server_url, user, token): + # type: (str, str,str) -> None + self.url_helper = UrlHelper( + server_url, user, token) + self.server_url = server_url + self.user = user + self.token = token + + def __getitem__(self, key): + # type: (str) -> str + return self[key] + + @staticmethod + def init_default(): + # type: () -> None + """Init JenkinsServer connection with default environment.""" + zanata_jenkins = { + 'server_url': os.environ.get('JENKINS_URL'), + 'user': os.environ.get('ZANATA_JENKINS_USER'), + 'token': os.environ.get('ZANATA_JENKINS_TOKEN'), + } + + if not zanata_jenkins['server_url']: + raise AssertionError("Missing environment 'JENKINS_URL'") + if not zanata_jenkins['user']: + raise AssertionError("Missing environment 'ZANATA_JENKINS_USER'") + if not zanata_jenkins['token']: + raise AssertionError("Missing environment 'ZANATA_JENKINS_TOKEN'") + + return JenkinsServer( + zanata_jenkins['server_url'], + zanata_jenkins['user'], + zanata_jenkins['token'], + ) + + @staticmethod + def create_parent_parser(): + # type () -> argparse.ArgumentParser + """Create a parser as parent of Jenkins job argument parser + e.g. -F , -b and """ + job_parent_parser = argparse.ArgumentParser(add_help=False) + job_parent_parser.add_argument( + '-F', '--folder', type=str, default='', + help='folder name') + job_parent_parser.add_argument( + '-b', '--branch', type=str, default='', + help='branch or PR name') + job_parent_parser.add_argument('job', type=str, help='job name') + return job_parent_parser + + +class JenkinsJob(object): + """Jenkins Job Objects""" + + @staticmethod + def dict_get_elem_by_path(dic, path): + # type (dict, str) -> object + """Return the elem in python dictionary given path + for example: you can use a/b to retrieve answer from following + dict: + { 'a': { 'b': 'answer' }}""" + obj = dic + for key in path.split('/'): + if obj[key]: + obj = obj[key] + else: + return None + return obj + + @staticmethod + def print_key_value(key, value): + # type (str, str) -> None + """Pretty print the key and value""" + return "%30s : %s" % (key, value) + + def get_elem(self, path): + # type: (str) -> object + """Get element from the job object""" + return JenkinsJob.dict_get_elem_by_path(self.content, path) + + def __repr__(self): + # type: () -> str + result = "\n".join([ + JenkinsJob.print_key_value(tup[0], tup[1]) for tup in [ + ['name', self.name], + ['folder', self.folder], + ['branch', self.branch]]]) + if self.content: + result += "\n\n%s" % "\n".join([ + JenkinsJob.print_key_value( + key, self.get_elem(key)) for key in [ + 'displayName', + 'fullName', + 'lastBuild/number', + 'lastCompletedBuild/number', + 'lastFailedBuild/number', + 'lastSuccessfulBuild/number']]) + return result + + def __init__(self, server, name, folder, branch): + # type (JenkinsServer, str, str, str) -> None + self.server = server + self.name = name + self.folder = folder + self.branch = branch + self.content = None + job_path = "job/%s" % self.name + if folder: + job_path = "job/%s/%s" % (folder, job_path) + if branch: + job_path += "/job/%s" % branch + self.url = "%s%s" % (self.server.server_url, job_path) + + def __getitem__(self, key): + # type: (str) -> str + return self[key] + + def load(self): + # type: () -> None + """Load the build object from Jenkins server""" + logging.info("Loading job from %s/api/python", self.url) + self.content = ast.literal_eval(UrlHelper.read( + "%s/api/python" % self.url)) + + def get_last_successful_build(self): + # type: () -> JenkinsJobBuild + """Get last successful build""" + if not self.content: + self.load() + + if not self.content: + raise AssertionError("Failed to load job from %s" % self.url) + return JenkinsJobBuild( + self, + int(self.get_elem('lastSuccessfulBuild/number')), + self.get_elem('lastSuccessfulBuild/url')) + + +class JenkinsJobBuild(object): + """Build object for Jenkins job""" + + def __init__(self, parent_job, build_number, build_url): + # type (object, int, str) -> None + self.parent_job = parent_job + self.number = build_number + self.url = build_url + self.content = None + + def __getitem__(self, key): + # type: (str) -> str + return self[key] + + def get_elem(self, path): + # type: (str) -> object + """Get element from the build object""" + return JenkinsJob.dict_get_elem_by_path(self.content, path) + + def load(self): + """Load the build object from Jenkins server""" + logging.info("Loading build from %sapi/python", self.url) + self.content = ast.literal_eval(UrlHelper.read( + "%s/api/python" % self.url)) + + def list_artifacts_related_paths(self, artifact_path_pattern='.*'): + # type: (str) -> List[str] + """Return a List of relativePaths of artifacts + that matches the path pattern""" + if not self.content: + self.load() + if not self.content: + raise AssertionError("Failed to load build from %s" % self.url) + return [ + artifact['relativePath'] + for artifact in self.content['artifacts'] + if re.search(artifact_path_pattern, artifact['relativePath'])] + + def __repr__(self): + # type: () -> str + result = "\n".join([ + JenkinsJob.print_key_value( + tup[0], str(tup[1])) for tup in [ + ['number', self.number], + ['url', self.url]]]) + + if self.content: + result += "\n\n%s" % "\n".join([ + JenkinsJob.print_key_value( + key, self.get_elem(key)) for key in [ + 'nextBuild/number', + 'previousBuild/number']]) + result += "\n\nArtifacts:\n%s" % "\n ".join( + self.list_artifacts_related_paths()) + return result + + +def show_job(): + # type () -> None + """Show the job information""" + server = JenkinsServer.init_default() + job = JenkinsJob(server, args.job, args.folder, args.branch) + print(job) + + +def show_last_successful_build(): + # type () -> None + """Show the last successful build for a Jenkins job""" + server = JenkinsServer.init_default() + job = JenkinsJob(server, args.job, args.folder, args.branch) + + build = job.get_last_successful_build() + build.load() + print(build) + + +def parse(): + # type () -> None + """Parse options and arguments""" + + parser = argparse.ArgumentParser(description='Jenkins helper functions') + job_parent_parser = JenkinsServer.create_parent_parser() + + subparsers = parser.add_subparsers( + title='Command', description='Valid commands', + help='Command help') + + job_parser = subparsers.add_parser( + 'show-job', + help='Get Job objects', + parents=[job_parent_parser], + ) + job_parser.set_defaults(func=show_job) + + build_parser = subparsers.add_parser( + 'show-last-successful-build', + help='Get build objects', + parents=[job_parent_parser]) + build_parser.set_defaults(func=show_last_successful_build) + + return parser.parse_args() + + +if __name__ == '__main__': + logging_init() + + args = parse() # pylint: disable=invalid-name + args.func() diff --git a/ZanataWar.py b/ZanataWar.py new file mode 100755 index 0000000..1010f5b --- /dev/null +++ b/ZanataWar.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python +"""Zanata WAR Helper functions""" + +import argparse +import os +import sys +import urlparse # pylint: disable=import-error +# python3-pylint does not do well on importing python2 module +from ZanataFunctions import logging_init +from ZanataFunctions import SshHost +from ZanataFunctions import UrlHelper +import JenkinsHelper + +try: + # We need to import 'List' and 'Any' for mypy to work + from typing import List, Any # noqa: F401 # pylint: disable=unused-import +except ImportError: + sys.stderr.write("python typing module is not installed" + os.linesep) + + +class ZanataWar(object): + """Class that manipulate zanata.war""" + def __init__(self, download_url=None, local_path=None): + # type (str, str) -> None + self.download_url = download_url + self.local_path = local_path + + def __getitem__(self, key): + # type: (str) -> str + return self[key] + + @staticmethod + def create_parent_parser(): + # type () -> argparse.ArgumentParser + """Create a parser as parent of Zanata war argument parser""" + parent_parser = argparse.ArgumentParser(add_help=False) + parent_parser.add_argument( + '-l', '--local-path', type=str, + help='Path to local WAR file.') + return parent_parser + + @staticmethod + def get_last_successful_build_url(server, job_name, folder, branch): + # type (JenkinsHelper.JenkinsServer, str, str, str) -> str + """Get the WAR file download url for last successful build""" + job = JenkinsHelper.JenkinsJob(server, job_name, folder, branch) + job.load() + build = job.get_last_successful_build() + build.load() + war_paths = build.list_artifacts_related_paths( + r'^.*/zanata-([0-9.]+).*\.war$') + return build.url + 'artifact/' + war_paths[0] + + def download( + self, + dest_file=None, + dest_dir=None): + # type (str, str) -> None + """Download WAR file from .download_url""" + if not self.download_url: + raise AssertionError("war download_url is not set.") + target_file = dest_file + if not target_file: + url_dict = urlparse.urlparse(self.download_url) + target_file = os.path.basename(url_dict.path) + + target_dir = os.path.abspath('.' if not dest_dir else dest_dir) + UrlHelper.download_file(self.download_url, target_file, target_dir) + self.local_path = os.path.join(target_dir, target_file) + + def scp_to_server( # pylint: disable=too-many-arguments + self, dest_host, + dest_path=None, + identity_file=None, + rm_old=False, + chmod=False, + source_path=None): + # type (str, str, str, bool, bool, str) -> None + """SCP to Zanata server""" + local_path = source_path if source_path else self.local_path + if not local_path: + raise AssertionError("source_path is missing") + if not os.path.isfile(local_path): + raise AssertionError(local_path + " does not exist") + + if dest_path: + target_path = dest_path + else: + target_path = ( + '/var/opt/rh/eap7/lib/wildfly/standalone/deployments/' + 'zanata.war') + + ssh_host = SshHost(dest_host, identity_file) + ssh_host.scp_to_remote(local_path, target_path, True, rm_old) + + if chmod: + ssh_host.run_check_call( + "chown jboss:jboss %s" % target_path, True) + + +def show_download_link(): + # type () -> None + """Show the download link of last successful build""" + server = JenkinsHelper.JenkinsServer.init_default() + + war = ZanataWar(ZanataWar.get_last_successful_build_url( + server, args.job, args.folder, args.branch)) + print(war.download_url) + + +def download_from_jenkins(): + # type () -> None + """Handling download-from-jenkins command + (download last successful build)""" + server = JenkinsHelper.JenkinsServer.init_default() + + war = ZanataWar(ZanataWar.get_last_successful_build_url( + server, args.job, args.folder, args.branch)) + war.download( + None if not args.local_path else os.path.basename(args.local_path), + None if not args.local_path else os.path.dirname(args.local_path)) + + +def scp_to_server(): + # type () -> None + """Handling scp-to-server command""" + server = JenkinsHelper.JenkinsServer.init_default() + + war = ZanataWar( + ZanataWar.get_last_successful_build_url( + server, args.job, args.folder, args.branch), + args.local_path) + + war.scp_to_server(args.host, args.dest_path, args.identity_file) + + +def deploy_local_war(war=None): + # type () -> None + """Deploy a build local WAR to zanata server. + This assumes login as root""" + if not war: + if not args.war_file: + raise AssertionError("args.war_file is missing") + war = ZanataWar(local_path=args.war_file) + + ssh_host = SshHost(args.host, args.identity_file) + ssh_host.run_check_call("systemctl stop eap7-standalone", True) + war.scp_to_server( + args.host, args.dest_path, args.identity_file, + True, True) + + ssh_host.run_check_call("systemctl start eap7-standalone", True) + + +def deploy(): + # type () -> None + """Download the last successfully built WAR, then deploy + the WAR file to zanata server. + This assumes login as root""" + server = JenkinsHelper.JenkinsServer.init_default() + + war = ZanataWar( + ZanataWar.get_last_successful_build_url( + server, args.job, args.folder, args.branch), + args.local_path) + + war.download( + None if not args.local_path else os.path.basename(args.local_path), + None if not args.local_path else os.path.dirname(args.local_path)) + + deploy_local_war(war) + + +def parse(): + # type () -> None + """Parse options and arguments""" + parser = argparse.ArgumentParser(description='WAR file functions') + job_parent_parser = JenkinsHelper.JenkinsServer.create_parent_parser() + war_parent_parser = ZanataWar.create_parent_parser() + ssh_parent_parser = SshHost.create_parent_parser() + + subparsers = parser.add_subparsers( + title='Command', description='Valid commands', + help='Command help') + + show_download_link_parser = subparsers.add_parser( + 'show-download-link', + help='Show download link', + parents=[job_parent_parser, war_parent_parser]) + show_download_link_parser.set_defaults(func=show_download_link) + + download_from_jenkins_parser = subparsers.add_parser( + 'download-from-jenkins', + help='Download from jenkins', + parents=[job_parent_parser, war_parent_parser]) + download_from_jenkins_parser.set_defaults(func=download_from_jenkins) + + scp_to_server_parser = subparsers.add_parser( + 'scp-to-server', + help='scp to server', + parents=[job_parent_parser, war_parent_parser, ssh_parent_parser]) + scp_to_server_parser.set_defaults(func=scp_to_server) + + deploy_local_war_parser = subparsers.add_parser( + 'deploy-local-war', + help="""deploy the existing local WAR to server + (assuming you are able to sudo)""", + parents=[ssh_parent_parser]) + deploy_local_war_parser.add_argument( + 'war_file', type=str, + help='Local WAR file to be upload') + deploy_local_war_parser.set_defaults(func=deploy_local_war) + + deploy_parser = subparsers.add_parser( + 'deploy', + help="""deploy the last successful built WAR to server + (assuming you are able to sudo)""", + parents=[job_parent_parser, war_parent_parser, ssh_parent_parser]) + deploy_parser.set_defaults(func=deploy) + return parser.parse_args() + + +if __name__ == '__main__': + # Set logging + logging_init() + + args = parse() # pylint: disable=invalid-name + args.func()