From c4a9983456b29f92e8f555ffdfca0924ba49cabc Mon Sep 17 00:00:00 2001 From: tirami-su Date: Tue, 19 Jul 2016 17:58:11 +0200 Subject: [PATCH 1/5] The cluster could be configured using a JSON configuration file. --- keys/README.md | 1 + spark_ec2.py | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 keys/README.md diff --git a/keys/README.md b/keys/README.md new file mode 100644 index 00000000..8f90f6e3 --- /dev/null +++ b/keys/README.md @@ -0,0 +1 @@ +This folder contains the identity files. \ No newline at end of file diff --git a/spark_ec2.py b/spark_ec2.py index 95e1f584..f4a296fa 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -31,6 +31,7 @@ import random import shutil import string +import json from stat import S_IRUSR import subprocess import sys @@ -167,6 +168,97 @@ class UsageError(Exception): pass +def parse_opt_args(parser, args=None): + if not args: + args = sys.argv[1:] + + opts, _ = parser.parse_args(args=args) + return opts + + +def process_conf_file(file_path, parser, opts): + configuration = {} + + try: + with open(file_path) as configuration_file: + configuration = json.load(configuration_file) + except Exception as e: + err_msg = " ".join(map(str, e.args)) + print("[!] Error when loading config file: {}".format(err_msg), file=stderr) + sys.exit(-1) + + JSON_SPECIAL_PARAMS = { + "no_ganglia": "--no-ganglia", + "delete_groups": "--delete_groups", + "private_ips": "--private-ips", + "resume": "--resume", + "use_existing_master": "--use-existing-master", + "copy_aws_credentials": "--copy-aws-credentials", + } + + JSON_PARAMS = { + "slaves": "--slaves", + "wait": "--wait", + "key_pair": "--key-pair", + "identity_file": "--identity-file", + "profile": "--profile", + "instance_type": "--instance-type", + "master_instance_type": "--master-instance-type", + "region": "--region", + "zone": "--zone", + "ami": "--ami", + "spark_version": "--spark-version", + "spark_git_repo": "--spark-git-repo", + "spark_ec2_git_repo": "--spark-ec2-git-repo", + "spark_ec2_git_branch": "--spark-ec2-git-branch", + "deploy_root_dir": "--deploy-root-dir", + "hadoop_major_version": "--hadoop-major-version", + "D": "-D", + "ebs_vol_size": "--ebs-vol-size", + "ebs_vol_type": "--ebs-vol-type", + "ebs_vol_num": "--ebs-vol-num", + "placement_group": "--placement-group", + "swap": "--swap", + "spot_price": "--spot-price", + "user": "--user", + "worker_instances": "--worker-instances", + "master_opts": "--master-opts", + "user_data": "--user-data", + "authorized_address": "--authorized-address", + "additional_security_group": "--additional-security-group", + "additional_tags": "--additional-tags", + "subnet_id": "--subnet-id", + "vpc_id": "--vpc-id", + "instance_initiated_shutdown_behavior": "--instance-initiated-shutdown-behavior", + "instance_profile_name": "--instance-profile-name" + } + + configuration["identity_file"] = os.path.join(os.path.dirname(os.path.realpath(__file__)), + "keys", + configuration["identity_file"]) + + if "credentials" in configuration: + try: + key_id = configuration["credentials"]["aws_access_key_id"] + access_key = configuration["credentials"]["aws_secret_access_key"] + os.environ["AWS_ACCESS_KEY_ID"] = key_id + os.environ["AWS_SECRET_ACCESS_KEY"] = access_key + except KeyError: + pass + + args = [] + for op in configuration: + if op in JSON_SPECIAL_PARAMS and configuration[op]: + args.append(JSON_SPECIAL_PARAMS[op]) + elif op in JSON_PARAMS: + option_value = "{opt} {val}".format(opt=JSON_PARAMS[op], + val=configuration[op]) + args.extend(option_value.split()) + + new_opts, _ = parser.parse_args(args) + return new_opts + + # Configure and parse our command-line arguments def parse_args(): parser = OptionParser( @@ -327,8 +419,13 @@ def parse_args(): parser.add_option( "--instance-profile-name", default=None, help="IAM profile name to launch instances under") + parser.add_option("-c", "--conf-file", default=None, + help="Specify config file", metavar="FILE") (opts, args) = parser.parse_args() + if opts.conf_file: + opts = process_conf_file(opts.conf_file, parser, opts) + if len(args) != 2: parser.print_help() sys.exit(1) From 45008d9178d7c8382bee0be75409930e0b304374 Mon Sep 17 00:00:00 2001 From: tirami-su Date: Wed, 20 Jul 2016 10:10:36 +0200 Subject: [PATCH 2/5] sys.exit(-1) -> sys.exit(1) and check config file emplacement changed. --- spark_ec2.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/spark_ec2.py b/spark_ec2.py index f4a296fa..43421110 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -183,9 +183,9 @@ def process_conf_file(file_path, parser, opts): with open(file_path) as configuration_file: configuration = json.load(configuration_file) except Exception as e: - err_msg = " ".join(map(str, e.args)) + err_msg = " ".join([str(err) for err in e.args]) print("[!] Error when loading config file: {}".format(err_msg), file=stderr) - sys.exit(-1) + sys.exit(1) JSON_SPECIAL_PARAMS = { "no_ganglia": "--no-ganglia", @@ -423,14 +423,15 @@ def parse_args(): help="Specify config file", metavar="FILE") (opts, args) = parser.parse_args() - if opts.conf_file: - opts = process_conf_file(opts.conf_file, parser, opts) if len(args) != 2: parser.print_help() sys.exit(1) (action, cluster_name) = args + if opts.conf_file: + opts = process_conf_file(opts.conf_file, parser, opts) + # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html home_dir = os.getenv('HOME') From c97ee21b2e5ab990cf0fcfb51472d10a5b57118c Mon Sep 17 00:00:00 2001 From: tirami-su Date: Wed, 20 Jul 2016 10:46:59 +0200 Subject: [PATCH 3/5] unused code removed. Comments added. --- spark_ec2.py | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/spark_ec2.py b/spark_ec2.py index 43421110..595ed3e4 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -168,17 +168,11 @@ class UsageError(Exception): pass -def parse_opt_args(parser, args=None): - if not args: - args = sys.argv[1:] - - opts, _ = parser.parse_args(args=args) - return opts - - -def process_conf_file(file_path, parser, opts): +def process_conf_file(file_path, parser): + """ + Load configuration file and extract arguments. + """ configuration = {} - try: with open(file_path) as configuration_file: configuration = json.load(configuration_file) @@ -187,7 +181,8 @@ def process_conf_file(file_path, parser, opts): print("[!] Error when loading config file: {}".format(err_msg), file=stderr) sys.exit(1) - JSON_SPECIAL_PARAMS = { + # True / False options parameters + json_special_params = { "no_ganglia": "--no-ganglia", "delete_groups": "--delete_groups", "private_ips": "--private-ips", @@ -196,8 +191,9 @@ def process_conf_file(file_path, parser, opts): "copy_aws_credentials": "--copy-aws-credentials", } - JSON_PARAMS = { - "slaves": "--slaves", + # Options parameters followed by values + json_params = { + "slaves": "--slaves", "wait": "--wait", "key_pair": "--key-pair", "identity_file": "--identity-file", @@ -233,6 +229,7 @@ def process_conf_file(file_path, parser, opts): "instance_profile_name": "--instance-profile-name" } + # Path to identity file (located in keys folder under current location of spark-ec2 file) configuration["identity_file"] = os.path.join(os.path.dirname(os.path.realpath(__file__)), "keys", configuration["identity_file"]) @@ -248,13 +245,12 @@ def process_conf_file(file_path, parser, opts): args = [] for op in configuration: - if op in JSON_SPECIAL_PARAMS and configuration[op]: - args.append(JSON_SPECIAL_PARAMS[op]) - elif op in JSON_PARAMS: - option_value = "{opt} {val}".format(opt=JSON_PARAMS[op], + if op in json_special_params and configuration[op]: + args.append(json_special_params[op]) + elif op in json_params: + option_value = "{opt} {val}".format(opt=json_params[op], val=configuration[op]) args.extend(option_value.split()) - new_opts, _ = parser.parse_args(args) return new_opts @@ -430,7 +426,7 @@ def parse_args(): (action, cluster_name) = args if opts.conf_file: - opts = process_conf_file(opts.conf_file, parser, opts) + opts = process_conf_file(opts.conf_file, parser) # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html From 5d856da2c4c8368c772c163cf8aee5da4095f8bd Mon Sep 17 00:00:00 2001 From: tirami-su Date: Wed, 27 Jul 2016 11:54:45 +0200 Subject: [PATCH 4/5] YAML configuration file, automatique mapping between cmd args and config file args. --- spark_ec2.py | 114 +++++++++++++++++++++++++-------------------------- 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/spark_ec2.py b/spark_ec2.py index 595ed3e4..cb05c285 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -31,7 +31,6 @@ import random import shutil import string -import json from stat import S_IRUSR import subprocess import sys @@ -145,7 +144,21 @@ def setup_external_libs(libs): tar.close() os.remove(tgz_file_path) print(" - Finished downloading {lib}.".format(lib=lib["name"])) - sys.path.insert(1, lib_dir) + + if lib["add-path"]: + lib["add-path"](lib_dir) + else: + sys.path.insert(1, lib_dir) + + +def add_pyaml_path(lib_dir): + lib_py2 = os.path.join(lib_dir, "lib") + if os.path.exists(lib_py2) and sys.version < "3": + sys.path.insert(1, lib_py2) + + lib_py3 = os.path.join(lib_dir, "lib3") + if os.path.exists(lib_py3) and sys.version >= "3": + sys.path.insert(1, lib_py3) # Only PyPI libraries are supported. @@ -153,7 +166,14 @@ def setup_external_libs(libs): { "name": "boto", "version": "2.34.0", - "md5": "5556223d2d0cc4d06dd4829e671dcecd" + "md5": "5556223d2d0cc4d06dd4829e671dcecd", + "add-path": None + }, + { + "name": "PyYAML", + "version": "3.11", + "md5": "f50e08ef0fe55178479d3a618efe21db", + "add-path": add_pyaml_path } ] @@ -163,6 +183,7 @@ def setup_external_libs(libs): from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType from boto import ec2 +import yaml class UsageError(Exception): pass @@ -172,85 +193,60 @@ def process_conf_file(file_path, parser): """ Load configuration file and extract arguments. """ + # Loading the configuration file. configuration = {} try: with open(file_path) as configuration_file: - configuration = json.load(configuration_file) + configuration = yaml.safe_load(configuration_file) except Exception as e: - err_msg = " ".join([str(err) for err in e.args]) - print("[!] Error when loading config file: {}".format(err_msg), file=stderr) + print("[!] An error occured when loading the config file:\n{}".format(e), file=stderr) sys.exit(1) - # True / False options parameters - json_special_params = { - "no_ganglia": "--no-ganglia", - "delete_groups": "--delete_groups", - "private_ips": "--private-ips", - "resume": "--resume", - "use_existing_master": "--use-existing-master", - "copy_aws_credentials": "--copy-aws-credentials", - } + unneeded_opts = ("version", "help") - # Options parameters followed by values - json_params = { - "slaves": "--slaves", - "wait": "--wait", - "key_pair": "--key-pair", - "identity_file": "--identity-file", - "profile": "--profile", - "instance_type": "--instance-type", - "master_instance_type": "--master-instance-type", - "region": "--region", - "zone": "--zone", - "ami": "--ami", - "spark_version": "--spark-version", - "spark_git_repo": "--spark-git-repo", - "spark_ec2_git_repo": "--spark-ec2-git-repo", - "spark_ec2_git_branch": "--spark-ec2-git-branch", - "deploy_root_dir": "--deploy-root-dir", - "hadoop_major_version": "--hadoop-major-version", - "D": "-D", - "ebs_vol_size": "--ebs-vol-size", - "ebs_vol_type": "--ebs-vol-type", - "ebs_vol_num": "--ebs-vol-num", - "placement_group": "--placement-group", - "swap": "--swap", - "spot_price": "--spot-price", - "user": "--user", - "worker_instances": "--worker-instances", - "master_opts": "--master-opts", - "user_data": "--user-data", - "authorized_address": "--authorized-address", - "additional_security_group": "--additional-security-group", - "additional_tags": "--additional-tags", - "subnet_id": "--subnet-id", - "vpc_id": "--vpc-id", - "instance_initiated_shutdown_behavior": "--instance-initiated-shutdown-behavior", - "instance_profile_name": "--instance-profile-name" - } + # mapppin_conf() aims to avoid maintaining a manual mapping dictionary. + # Transforms --option-like-this to option_like_this + def mapping_conf(opt): + normal_opt = str(opt).split("/")[-1] + trans_opt = normal_opt.strip("-") + trans_opt = trans_opt.replace("-", "_") + return {trans_opt: normal_opt} + + map_conf = {} + [map_conf.update(mapping_conf(opt)) for opt in parser.option_list] + [map_conf.pop(unneeded_opt, None) for unneeded_opt in unneeded_opts] # Path to identity file (located in keys folder under current location of spark-ec2 file) configuration["identity_file"] = os.path.join(os.path.dirname(os.path.realpath(__file__)), "keys", configuration["identity_file"]) + # Setting credentials to the environment to access AWS. if "credentials" in configuration: try: key_id = configuration["credentials"]["aws_access_key_id"] access_key = configuration["credentials"]["aws_secret_access_key"] - os.environ["AWS_ACCESS_KEY_ID"] = key_id - os.environ["AWS_SECRET_ACCESS_KEY"] = access_key except KeyError: pass + else: + os.environ["AWS_ACCESS_KEY_ID"] = key_id + os.environ["AWS_SECRET_ACCESS_KEY"] = access_key + # Creating the args from the values present in the configuration file. args = [] - for op in configuration: - if op in json_special_params and configuration[op]: - args.append(json_special_params[op]) - elif op in json_params: - option_value = "{opt} {val}".format(opt=json_params[op], + options = set(map_conf).intersection(configuration) + for op in options: + takes_value = parser.get_option(map_conf[op]).takes_value() + # Extends args with options that takes an value, example: slaves. + if takes_value: + option_value = "{opt} {val}".format(opt=map_conf[op], val=configuration[op]) args.extend(option_value.split()) + elif configuration[op]: + # Option that doesn't takes value, like --copy-aws-credentials + # Verifying that those options are setted(True). + args.append(map_conf[op]) + new_opts, _ = parser.parse_args(args) return new_opts From d7076403a287ab4d0d6bf7700a19acc86f3eb751 Mon Sep 17 00:00:00 2001 From: tirami-su Date: Wed, 27 Jul 2016 18:07:23 +0200 Subject: [PATCH 5/5] No longer need to put keys in the keys/ folder. The keyfs need to be located under .ssh/ --- keys/README.md | 1 - spark_ec2.py | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) delete mode 100644 keys/README.md diff --git a/keys/README.md b/keys/README.md deleted file mode 100644 index 8f90f6e3..00000000 --- a/keys/README.md +++ /dev/null @@ -1 +0,0 @@ -This folder contains the identity files. \ No newline at end of file diff --git a/spark_ec2.py b/spark_ec2.py index cb05c285..64d67fc9 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -216,10 +216,10 @@ def mapping_conf(opt): [map_conf.update(mapping_conf(opt)) for opt in parser.option_list] [map_conf.pop(unneeded_opt, None) for unneeded_opt in unneeded_opts] - # Path to identity file (located in keys folder under current location of spark-ec2 file) - configuration["identity_file"] = os.path.join(os.path.dirname(os.path.realpath(__file__)), - "keys", - configuration["identity_file"]) + # Generating path to identity file (located in ~/.ssh/) + home_dir = os.path.expanduser('~') + key_pair_path = os.path.join(home_dir, ".ssh", configuration["identity_file"]) + configuration["identity_file"] = key_pair_path # Setting credentials to the environment to access AWS. if "credentials" in configuration: