From bdc4534e33464588096025a009caac01f9e44e13 Mon Sep 17 00:00:00 2001 From: Alan Milligan Date: Thu, 20 May 2021 10:22:00 +1000 Subject: [PATCH 1/2] replace all unix-specific subprocess calls such that app runs on Windows --- afctl/_version.py | 2 +- afctl/parser_helpers.py | 26 ++++----- afctl/parsers.py | 10 ++-- .../deployments/docker/deployment_config.py | 8 +-- .../deployments/qubole/qubole_utils.py | 19 ++----- afctl/templates/dag_template.py | 4 +- .../deployment_tests/test_local_deployment.py | 16 +++--- afctl/tests/parser_tests/test_parser_utils.py | 54 ++++++++++--------- afctl/tests/utils.py | 7 ++- afctl/utils.py | 29 ++++++---- requirements.txt | 4 +- 11 files changed, 95 insertions(+), 84 deletions(-) diff --git a/afctl/_version.py b/afctl/_version.py index c469247..8f07417 100644 --- a/afctl/_version.py +++ b/afctl/_version.py @@ -495,7 +495,7 @@ def get_versions(): # versionfile_source is the relative path from the top of the source # tree (where the .git directory might live) to this file. Invert # this to find the root from __file__. - for i in cfg.versionfile_source.split('/'): + for i in cfg.versionfile_source.split(os.path.sep): root = os.path.dirname(root) except NameError: return {"version": "0+unknown", "full-revisionid": None, diff --git a/afctl/parser_helpers.py b/afctl/parser_helpers.py index b41b24d..9762118 100644 --- a/afctl/parser_helpers.py +++ b/afctl/parser_helpers.py @@ -1,8 +1,11 @@ from afctl.utils import Utility import os -import subprocess +import shutil from termcolor import colored from afctl.exceptions import AfctlParserException +import git + +SEP = os.path.sep class ParserHelpers(): @@ -10,7 +13,7 @@ class ParserHelpers(): def get_project_file_names(name): try: pwd = os.getcwd() - main_dir = pwd if name == '.' else os.path.join(pwd, name.lstrip('/').rstrip('/')) + main_dir = pwd if name == '.' else os.path.join(pwd, name.lstrip(SEP).rstrip(SEP)) project_name = os.path.basename(main_dir) config_dir = Utility.CONSTS['config_dir'] config_file = Utility.project_config(project_name) @@ -35,11 +38,10 @@ def get_project_file_names(name): @staticmethod def add_git_config(files): try: - origin = subprocess.run(['git', '--git-dir={}'.format(os.path.join(files['main_dir'], '.git')), 'config', - '--get', 'remote.origin.url'],stdout=subprocess.PIPE) - origin = origin.stdout.decode('utf-8')[:-1] - if origin == '': - subprocess.run(['git', 'init', files['main_dir']]) + try: + origin = git.Repo(os.path.join(files['main_dir'])) + except git.exc.InvalidGitRepositoryError: + origin = git.Repo.init(os.path.join(files['main_dir'])) print(colored("Git origin is not set for this repository. Run 'afctl config global -o '", 'yellow')) else: print("Updating git origin.") @@ -71,9 +73,8 @@ def generate_all(files): #STEP - 2: create config file ParserHelpers.generate_config_file(files) - subprocess.run(['cp', '{}/templates/gitignore.txt'.format(os.path.dirname(os.path.abspath(__file__))), - sub_file['.gitignore']]) - + shutil.copyfile('{}/templates/gitignore.txt'.format(os.path.dirname(os.path.abspath(__file__))), + sub_file['.gitignore']) except Exception as e: raise AfctlParserException(e) @@ -81,9 +82,8 @@ def generate_all(files): @staticmethod def generate_config_file(files): try: - subprocess.run(['cp', '{}/plugins/deployments/deployment_config.yml'.format(os.path.dirname(os.path.abspath(__file__))), - files['config_file']]) - + shutil.copyfile('{}/plugins/deployments/deployment_config.yml'.format(os.path.dirname(os.path.abspath(__file__))), + files['config_file']) ParserHelpers.add_git_config(files) except Exception as e: diff --git a/afctl/parsers.py b/afctl/parsers.py index 6d45bda..456e255 100644 --- a/afctl/parsers.py +++ b/afctl/parsers.py @@ -5,7 +5,6 @@ from afctl import __version__ from afctl.utils import Utility from afctl.exceptions import AfctlParserException -import subprocess from afctl.plugins.deployments.deployment_config import DeploymentConfig from afctl.parser_helpers import ParserHelpers from termcolor import colored @@ -123,9 +122,10 @@ def generate(cls, args): elif args.type == "module": path = "{}/{}/dags/{}".format(project_path, project_name, args.n) test_path = "{}/tests/{}".format(project_path, args.n) - mod_val = subprocess.call(['mkdir', path]) - test_val = subprocess.call(['mkdir', test_path]) - if mod_val != 0 or test_val != 0: + try: + os.makedirs(path, exist_ok=True) + os.makedirs(test_path, exist_ok=True) + except: cls.parser.error(colored("Unable to generate.", 'red')) print(colored("Generated successfully.", 'green')) @@ -286,4 +286,4 @@ def act_on_configs(cls, args, project_name): Utility.print_file(Utility.project_config(project_name)) except Exception as e: - AfctlParserException(e) \ No newline at end of file + AfctlParserException(e) diff --git a/afctl/plugins/deployments/docker/deployment_config.py b/afctl/plugins/deployments/docker/deployment_config.py index 51970ce..99023d8 100644 --- a/afctl/plugins/deployments/docker/deployment_config.py +++ b/afctl/plugins/deployments/docker/deployment_config.py @@ -4,7 +4,7 @@ import os import yaml from afctl.utils import Utility -import subprocess +import docker # Yaml Structure # deployment: @@ -47,8 +47,10 @@ def deploy_project(cls, args, config_file): with open(Utility.project_config(config_file)) as file: config = yaml.full_load(file) - val = subprocess.call(['docker', 'info']) - if val != 0: + try: + client = docker.from_env() + client.info() + except: return True, "Docker is not running. Please start docker." if args.d: diff --git a/afctl/plugins/deployments/qubole/qubole_utils.py b/afctl/plugins/deployments/qubole/qubole_utils.py index 656bda5..14af18f 100644 --- a/afctl/plugins/deployments/qubole/qubole_utils.py +++ b/afctl/plugins/deployments/qubole/qubole_utils.py @@ -1,25 +1,17 @@ -import subprocess from afctl.exceptions import AfctlDeploymentException from mako.template import Template from qds_sdk.qubole import Qubole from qds_sdk.commands import ShellCommand from urllib.parse import urlparse - +import git class QuboleUtils(): @staticmethod def fetch_latest_commit(origin, branch): try: - commit = subprocess.run(['git', 'ls-remote', origin, 'refs/heads/{}'.format(branch), '|', 'cut', '-f', '1'], - stdout=subprocess.PIPE) - commit = commit.stdout.decode('utf-8') - - if commit == '': - return None - - return commit.split('\t')[0] - + repo = git.Repo('.') + return repo.remotes.origin.refs[0].commit.hexsha except Exception as e: raise AfctlDeploymentException(e) @@ -95,9 +87,8 @@ def generate_configs(configs, args): env = "{}/api".format(config['env'].rstrip('/')) cluster = config['cluster'] token = config['token'] - branch = subprocess.run(['git', 'symbolic-ref', '--short', 'HEAD'], stdout=subprocess.PIPE).stdout.decode( - 'utf-8')[:-1] - + repo = git.Repo('.') + branch = repo.active_branch.name return { 'name': name, 'env': env, diff --git a/afctl/templates/dag_template.py b/afctl/templates/dag_template.py index cca00fa..eb36e5b 100644 --- a/afctl/templates/dag_template.py +++ b/afctl/templates/dag_template.py @@ -8,8 +8,8 @@ def dag_template(name, config_name): default_args = { 'owner': '${config_name}', +'start_date': datetime.now() - timedelta(days=1), # 'depends_on_past': , -# 'start_date': , # 'email': , # 'email_on_failure': , # 'email_on_retry': , @@ -23,4 +23,4 @@ def dag_template(name, config_name): """ ) - return template.render_unicode(name=name, config_name=config_name) \ No newline at end of file + return template.render_unicode(name=name, config_name=config_name) diff --git a/afctl/tests/deployment_tests/test_local_deployment.py b/afctl/tests/deployment_tests/test_local_deployment.py index 217aa4f..e20547e 100644 --- a/afctl/tests/deployment_tests/test_local_deployment.py +++ b/afctl/tests/deployment_tests/test_local_deployment.py @@ -1,7 +1,9 @@ from afctl.plugins.deployments.docker.deployment_config import DockerDeploymentConfig from afctl.tests.utils import clean_up, PROJECT_NAME, PROJECT_CONFIG_DIR import pytest -import os, subprocess +import os, pathlib, tempfile + +TMP = tempfile.gettempdir() class TestLocalDeployment: @@ -10,12 +12,12 @@ class TestLocalDeployment: def create_project(self): clean_up(PROJECT_NAME) clean_up(PROJECT_CONFIG_DIR) - main_dir = os.path.join('/tmp', PROJECT_NAME) - subprocess.run(['mkdir', main_dir]) - subprocess.run(['mkdir', PROJECT_CONFIG_DIR]) - subprocess.run(['mkdir', os.path.join(main_dir, 'deployments')]) + main_dir = os.path.join(TMP, PROJECT_NAME) + os.makedirs(main_dir, exist_ok=True) + os.makedirs(PROJECT_CONFIG_DIR, exist_ok=True) + os.makedirs(os.path.join(main_dir, 'deployments'), exist_ok=True) config_file = "{}.yml".format(os.path.join(PROJECT_CONFIG_DIR, PROJECT_NAME)) - subprocess.run(['touch', config_file]) + pathlib.Path(config_file).touch() config_file_content = """ global: airflow_version: @@ -51,4 +53,4 @@ def test_docker_compose_generation(self, create_project): current_output = open(config_file).read() expected_output = expected_output.replace(" ", "") current_output = current_output.replace(" ", "") - assert expected_output == current_output \ No newline at end of file + assert expected_output == current_output diff --git a/afctl/tests/parser_tests/test_parser_utils.py b/afctl/tests/parser_tests/test_parser_utils.py index 230944a..469f441 100644 --- a/afctl/tests/parser_tests/test_parser_utils.py +++ b/afctl/tests/parser_tests/test_parser_utils.py @@ -1,13 +1,17 @@ from afctl.utils import Utility import pytest -import os, subprocess +import os, pathlib, shutil, tempfile from afctl.tests.utils import create_path_and_clean, PROJECT_NAME, PROJECT_CONFIG_DIR, clean_up +SEP = os.path.sep +TMP = tempfile.gettempdir() +TMP_NO_SEP = TMP.replace(SEP,'') + class TestUtils: @pytest.fixture(scope='function') def clean_tmp_dir(self): - parent = ['/tmp'] + parent = [TMP] child = ['one', 'two', 'three'] create_path_and_clean(parent, child) yield @@ -15,26 +19,26 @@ def clean_tmp_dir(self): # create_dirs def test_create_dir(self, clean_tmp_dir): - parent = ['/tmp'] + parent = [TMP] child = ['one', 'two', 'three'] dirs = Utility.create_dirs(parent, child) - assert dirs['one'] == '/tmp/one' - assert os.path.exists(dirs['one']) is True - assert dirs['two'] == '/tmp/two' - assert os.path.exists(dirs['two']) is True - assert dirs['three'] == '/tmp/three' - assert os.path.exists(dirs['three']) is True + assert dirs['one'] == SEP.join([TMP, 'one']) + assert os.path.isdir(dirs['one']) + assert dirs['two'] == SEP.join([TMP, 'two']) + assert os.path.isdir(dirs['two']) + assert dirs['three'] == SEP.join([TMP, 'three']) + assert os.path.isdir(dirs['three']) # create_files def test_create_files(self, clean_tmp_dir): - parent = ['/tmp'] + parent = [TMP] child = ['one', 'two', 'three'] dirs = Utility.create_files(parent, child) - assert dirs['one'] == '/tmp/one' + assert dirs['one'] == SEP.join([TMP, 'one']) assert os.path.exists(dirs['one']) is True - assert dirs['two'] == '/tmp/two' + assert dirs['two'] == SEP.join([TMP, 'two']) assert os.path.exists(dirs['two']) is True - assert dirs['three'] == '/tmp/three' + assert dirs['three'] == SEP.join([TMP, 'three']) assert os.path.exists(dirs['three']) is True # project_config @@ -47,7 +51,7 @@ def test_return_project_config_file(self): # generate_dag_template def test_generate_dag_template(self): project_name = "tes_project" - path = "/tmp" + path = TMP dag = "test" Utility.generate_dag_template(project_name, dag, path) expected_output = """ @@ -56,8 +60,8 @@ def test_generate_dag_template(self): default_args = { 'owner': 'tes_project', +'start_date': datetime.now() - timedelta(days=1), # 'depends_on_past': , -# 'start_date': , # 'email': , # 'email_on_failure': , # 'email_on_retry': , @@ -76,26 +80,26 @@ def test_generate_dag_template(self): @pytest.fixture(scope='function') def create_project(self): - path = '/tmp/one/two/three' - subprocess.run(['mkdir', '-p', path]) - file_path = '/tmp/one/two/.afctl_project' - subprocess.run(['touch', file_path]) + path = os.path.sep.join([TMP, 'one', 'two', 'three']) + os.makedirs(path, exist_ok=True) + file_path = os.path.sep.join([TMP, 'one', 'two', '.afctl_project']) + pathlib.Path(file_path).touch() yield - subprocess.run(['rm', '-rf', path]) + shutil.rmtree(path) # find_project def test_find_project(self, create_project): - path = '/tmp/one/two/three' + path = os.path.sep.join([TMP, 'one', 'two', 'three']) project = Utility.find_project(path) assert project[0] == 'two' - assert project[1] == '/tmp/one/two' + assert project[1] == os.path.sep.join([TMP, 'one', 'two']) @pytest.fixture(scope='function') def create_config_file(self): - subprocess.run(['mkdir', PROJECT_CONFIG_DIR]) + os.mkdir(PROJECT_CONFIG_DIR) file_path = os.path.join(PROJECT_CONFIG_DIR, PROJECT_NAME)+'.yml' - subprocess.run(['touch', file_path]) + pathlib.Path(file_path).touch() yml_template = """ parent: child1: @@ -160,4 +164,4 @@ def test_add_and_update_configs(self, create_config_file): current_output = open(config_file).read() expected_output = expected_output.replace(" ", "") current_output = current_output.replace(" ", "") - assert expected_output == current_output \ No newline at end of file + assert expected_output == current_output diff --git a/afctl/tests/utils.py b/afctl/tests/utils.py index 305fc1e..3c70a70 100644 --- a/afctl/tests/utils.py +++ b/afctl/tests/utils.py @@ -1,5 +1,5 @@ import os -import subprocess +import shutil import itertools PROJECT_NAME = 'test_project' @@ -20,7 +20,10 @@ def __init__(self, type=None, origin=None, token=None, version=None, env=None, c def clean_up(project_file): if os.path.exists(project_file): - subprocess.run(['rm', '-rf', project_file]) + if os.path.isdir(project_file): + shutil.rmtree(project_file, ignore_errors=True) + else: + os.unlink(project_file) def check_paths(parent, child): diff --git a/afctl/utils.py b/afctl/utils.py index 876dc1f..cbb3572 100644 --- a/afctl/utils.py +++ b/afctl/utils.py @@ -1,11 +1,13 @@ import os import itertools -import subprocess +import pathlib import yaml from afctl.exceptions import AfctlUtilsException from afctl.templates.dag_template import dag_template from termcolor import colored +SEP = os.path.sep + class Utility(): CONSTS = { @@ -17,11 +19,12 @@ def create_dirs(parent, child): try: dirs = {} for dir1, dir2 in itertools.product(parent, child): - if not os.path.exists(os.path.join(dir1, dir2)): - os.mkdir(os.path.join(dir1, dir2)) + target = os.path.join(dir1, dir2) + if not os.path.exists(target): + os.makedirs(target) else: print("{} already exists. Skipping.".format(dir2)) - dirs[dir2] = os.path.join(dir1, dir2) + dirs[dir2] = target return dirs except Exception as e: raise AfctlUtilsException(e) @@ -33,7 +36,7 @@ def create_files(parent, child): files = {} for dir1, dir2 in itertools.product(parent, child): if not os.path.exists(os.path.join(dir1, dir2)): - subprocess.run(['touch', os.path.join(dir1, dir2)]) + pathlib.Path(os.path.join(dir1, dir2)).touch() else: print("{} already exists. Skipping.".format(dir2)) files[dir2] = os.path.join(dir1, dir2) @@ -65,7 +68,11 @@ def project_config(file): @staticmethod def print_file(file): - subprocess.call(['cat', file]) + try: + with open(file) as fh: + print(fh.read()) + finally: + fh.close() @staticmethod def update_config(file, config): @@ -100,11 +107,11 @@ def crawl_config(crawler, config): @staticmethod def find_project(pwd): - dirs = pwd.lstrip('/').split('/') - for i in range(len(dirs)+1): - path = '/'.join(dirs[:i]) - if os.path.exists(os.path.join('/'+path, '.afctl_project')): - return [dirs[i-1], '/'+path] + dirs = pwd.split(SEP) + for i in range(len(dirs), 0, -1): + path = SEP.join(dirs[:i]) + if os.path.exists(os.path.join(path, '.afctl_project')): + return [dirs[i-1], path] return None @staticmethod diff --git a/requirements.txt b/requirements.txt index de99664..2c07d03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,6 @@ versioneer==0.18 mako==1.1.0 qds-sdk==1.13.2 termcolor -pytest \ No newline at end of file +pytest +gitpython +docker From 85b4a840eebd498bda1981ee5b06ac96e6932edf Mon Sep 17 00:00:00 2001 From: Alan Milligan Date: Tue, 8 Jun 2021 13:07:27 +1000 Subject: [PATCH 2/2] remote(scp) implementation --- README.md | 35 +++- afctl/meta.yml | 1 + afctl/parsers.py | 9 +- .../plugins/deployments/deployment_config.py | 16 +- .../plugins/deployments/deployment_config.yml | 3 +- .../deployments/qubole/deployment_config.py | 2 +- afctl/plugins/deployments/scp/__init__.py | 0 .../deployments/scp/deployment_config.py | 176 ++++++++++++++++++ .../deployment_tests/test_scp_deployment.py | 99 ++++++++++ afctl/tests/parser_tests/test_parser_utils.py | 4 +- afctl/tests/parser_tests/test_parsers.py | 3 +- afctl/tests/utils.py | 6 +- 12 files changed, 336 insertions(+), 18 deletions(-) create mode 100644 afctl/plugins/deployments/scp/__init__.py create mode 100644 afctl/plugins/deployments/scp/deployment_config.py create mode 100644 afctl/tests/deployment_tests/test_scp_deployment.py diff --git a/README.md b/README.md index 9f21e11..82f3cf1 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,34 @@ afctl config add -d qubole -n demo -e https://api.qubole.com -c airflow_1102 -t afctl deploy qubole -n ``` +### 7. Deploy project to a remote Airflow + +You can deploy your DAG's directly to a remote Airflow instance using SCP. Your login must have +enough permissions to write to the DAG folder; but since we don't know the installation, it isn't +possible to manage any Python requirements using this deployer. + +In order to configure remote deployment, you must pass host and dag directory; you may set the +login account, and either a password or identity file. + +```bash +afctl config add -d remote -m airflow.example.com -d /var/lib/airflow/dags -e mypassword +``` +or (say) + +```bash +afctl config add -d remote -m airflow.example.com -d /var/lib/airflow/dags -u airflow -i ~/.ssh/id_rsa-airflow +``` + +This command will modify your config file. You can see your config file with the following command : +```bash +afctl config show + +* To deploy run the following command +```bash +afctl deploy remote +``` + + ### The following video also contains all the steps of deploying project using afctl -
https://www.youtube.com/watch?v=A4rcZDGtJME&feature=youtu.be @@ -194,8 +222,9 @@ global: --access-token: deployment: -qubole: ---local: ----compose: +-local: +--compose: +-remote: ```
@@ -225,8 +254,6 @@ afctl -h ```
-### Caution -Not yet ported for Windows. #### Credits Docker-compose file : https://github.com/puckel/docker-airflow diff --git a/afctl/meta.yml b/afctl/meta.yml index 47389df..d8e528c 100644 --- a/afctl/meta.yml +++ b/afctl/meta.yml @@ -7,4 +7,5 @@ hooks: deployment: qubole local + remote diff --git a/afctl/parsers.py b/afctl/parsers.py index 456e255..51aa309 100644 --- a/afctl/parsers.py +++ b/afctl/parsers.py @@ -43,7 +43,7 @@ def init(cls, args): os.mkdir(files['config_dir']) if os.path.exists(files['main_dir']) and os.path.exists(files['config_file']): - cls.parser.error(colored("Project already exists. Please delete entry under /home/.afctl_congfis", 'red')) + cls.parser.error(colored("Project already exists. Please delete entry under ~/.afctl_config", 'red')) print(colored("Initializing new project...", 'green')) @@ -178,13 +178,16 @@ def get_subparsers(cls): , 'args': [ ['type', {'choices':['add', 'update', 'show', 'global']}], - ['-d', {'choices': ['qubole']}], + ['-d', {'choices': ['qubole', 'remote']}], + ['-m'], + ['-i'], ['-o'], ['-p'], ['-n'], ['-e'], ['-c'], ['-t'], + ['-u'], ['-v'] ] @@ -286,4 +289,4 @@ def act_on_configs(cls, args, project_name): Utility.print_file(Utility.project_config(project_name)) except Exception as e: - AfctlParserException(e) + raise AfctlParserException(e) diff --git a/afctl/plugins/deployments/deployment_config.py b/afctl/plugins/deployments/deployment_config.py index f40a6e9..9a7285e 100644 --- a/afctl/plugins/deployments/deployment_config.py +++ b/afctl/plugins/deployments/deployment_config.py @@ -1,24 +1,27 @@ from afctl.plugins.deployments.base_deployment_config import BaseDeploymentConfig from afctl.plugins.deployments.qubole.deployment_config import QuboleDeploymentConfig from afctl.plugins.deployments.docker.deployment_config import DockerDeploymentConfig +from afctl.plugins.deployments.scp.deployment_config import ScpDeploymentConfig from afctl.exceptions import AfctlDeploymentException class DeploymentConfig(BaseDeploymentConfig): # Just append configs for other deployments here. CONFIG_DETAILS = QuboleDeploymentConfig.CONFIG_PARSER_USAGE+\ - DockerDeploymentConfig.CONFIG_PARSER_USAGE + DockerDeploymentConfig.CONFIG_PARSER_USAGE+\ + ScpDeploymentConfig.CONFIG_PARSER_USAGE DEPLOY_DETAILS = QuboleDeploymentConfig.DEPLOY_PARSER_USAGE+\ - DockerDeploymentConfig.DEPLOY_PARSER_USAGE + DockerDeploymentConfig.DEPLOY_PARSER_USAGE+\ + ScpDeploymentConfig.DEPLOY_PARSER_USAGE @classmethod def validate_configs(cls, args): try: - if args.d == 'qubole': return QuboleDeploymentConfig.validate_configs(args) - + if args.d == 'remote': + return ScpDeploymentConfig.validate_configs(args) except Exception as e: raise AfctlDeploymentException(e) @@ -34,5 +37,8 @@ def deploy_project(cls, args, project_name, project_path): if args.type == "qubole": return QuboleDeploymentConfig.deploy_project(args, project_name) + if args.type == "remote": + return ScpDeploymentConfig.deploy_project(args, project_name) + except Exception as e: - raise AfctlDeploymentException(e) \ No newline at end of file + raise AfctlDeploymentException(e) diff --git a/afctl/plugins/deployments/deployment_config.yml b/afctl/plugins/deployments/deployment_config.yml index 9f7be15..09845f1 100644 --- a/afctl/plugins/deployments/deployment_config.yml +++ b/afctl/plugins/deployments/deployment_config.yml @@ -6,4 +6,5 @@ global: deployment: qubole: local: - compose: \ No newline at end of file + compose: + remote: diff --git a/afctl/plugins/deployments/qubole/deployment_config.py b/afctl/plugins/deployments/qubole/deployment_config.py index 61e40f3..0c3b7c9 100644 --- a/afctl/plugins/deployments/qubole/deployment_config.py +++ b/afctl/plugins/deployments/qubole/deployment_config.py @@ -127,4 +127,4 @@ def deploy_project(cls, args, config_file): return False, "" except Exception as e: - raise AfctlDeploymentException(e) \ No newline at end of file + raise AfctlDeploymentException(e) diff --git a/afctl/plugins/deployments/scp/__init__.py b/afctl/plugins/deployments/scp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/afctl/plugins/deployments/scp/deployment_config.py b/afctl/plugins/deployments/scp/deployment_config.py new file mode 100644 index 0000000..bbedad8 --- /dev/null +++ b/afctl/plugins/deployments/scp/deployment_config.py @@ -0,0 +1,176 @@ +from afctl.plugins.deployments.base_deployment_config import BaseDeploymentConfig +from afctl.exceptions import AfctlDeploymentException +import os +import yaml +from afctl.utils import Utility +import getpass +import paramiko +import scp + +SEP = os.path.sep + +# Yaml Structure +# deployment: +# remote: +# host: +# dagdir: +# username: +# password: +# identity: + +def unixify_path(p): + """ target path MUST be Unix - cos airflow doesn't run on Windoz """ + return p.replace(SEP, '/') + +class ScpDeploymentConfig(BaseDeploymentConfig): + CONFIG_PARSER_USAGE = \ + ' [ remote ]\n' +\ + ' -m : airflow hostname\n' +\ + ' -c : airflow DAG directory\n' +\ + ' -u : login user (defaults to current user) \n' +\ + ' -e : login password\n' +\ + ' -i : identity file path (RSA private key)' +\ + ' You should have one of -p or -i.\n' + + DEPLOY_PARSER_USAGE = \ + ' [remote] - Deploy your project to remote Airflow instance.\n' + + + @classmethod + def validate_configs(cls, args): + try: + config = {} + # No argument is provided. So we will ask for the input from the user. + if args.m is None and args.c is None: + config['host'] = input("Enter Airflow host : ") + config['dagdir'] = input("Enter Airflow DAG dir : ") + username = input("Enter remote username (empty if current): ") + if username: + config['username'] = username + password = input("Enter remote password (empty if identity file): ") + if password: + config['password'] = password + identity = input("Enter remote identity - RSA private key (empty if password) : ") + if identity: + config['identity'] = identity + + # If update return the entire path because we are not sure if he has updated everything or only some values. + if args.type == 'update': + return {'deployment':{'remote': config}}, False, "" + + # In add just append this to your parent. + if args.type == 'add': + return {'remote': config}, False, "" + + # Some arguments are given by the user. So don't ask for input. + else: + + # Name of connection is compulsory in this flow. + if args.m is None: + return config, True, "Airflow host/IP required. Check usage." + + if args.c is None: + return config, True, "Airflow DAG directory required. Check usage." + + config['host'] = args.m + config['dagdir'] = args.c + + if args.u is not None: + config['username'] = args.u + + if args.e is not None: + config['password'] = args.e + + if args.i is not None: + config['identity'] = args.i + + # For adding a new connection you need to provide at least these configs + if args.type == 'add' and (args.m is None or args.c is None): + return {}, True, "Arguments are required to add a new config. Check usage." + + if args.type == 'update': + return {'deployment':{'remote': config}}, False, "" + if args.type == "add": + return {'remote': config}, False, "" + + return {}, False, "Some error has occurred." + + except Exception as e: + raise AfctlDeploymentException(e) + + @classmethod + def generate_dirs(cls, main_dir): + pass + + @classmethod + def deploy_project(cls, args, config_file): + try: + project, project_path = Utility.find_project(os.getcwd()) + except TypeError: + # hmmm - it returned None + print("No project found!") + return + + dags = [] + + try: + print("Deploying afctl project (%s) to remote" % project) + + with open(Utility.project_config(config_file)) as file: + config = yaml.full_load(file) + remote_dagdir = os.path.join(config['deployment']['remote']['dagdir']) + for root, dirs, files in os.walk(project_path, topdown=True, followlinks=False): + # TODO - assure it's proper directory/file hierarchy + if root.find('%sdags%s' % (SEP,SEP)) != -1: + for dagname in files: + if dagname.endswith('.py'): + dags.append((os.path.join(root, dagname), + unixify_path(os.path.join(remote_dagdir, dagname)))) + + if dags is []: + print("%s: no DAGs to ship" % project) + return + + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.load_system_host_keys() + + args = { + 'hostname': config['deployment']['remote']['host'], + 'username': config['deployment']['remote'].get('username', getpass.getuser()), + } + # look for user/password; otherwise go with ssh keys + # TODO - maybe just a 'connect' label in the yaml; and pass exactly that as paramiko args ... + if config['deployment']['remote'].get('password', None) is None: + pkey_path = config['deployment']['remote'].get('identity', + os.path.join(os.path.expanduser("~"), '.ssh', 'id_rsa')) + try: + args['pkey'] = paramiko.RSAKey.from_private_key_file(pkey_path) + except FileNotFoundError: + raise FileNotFoundError('identity file missing: %s' % pkey_path) + except paramiko.ssh_exception.SSHException as e: + raise paramiko.ssh_exception.SSHException('identity file invalid: %s\n%s' % (pkey_path, str(e))) + #args['key_filename'] = pkey_path + else: + args['password'] = config['deployment']['remote']['password'] + try: + ssh_client.connect(**args) + except paramiko.BadHostKeyException as e: + raise + except paramiko.ssh_exception.AuthenticationException: + if args.get('password', None) is not None: + args['password'] = '*******' + raise paramiko.ssh_exception.AuthenticationException('Authentication Failure: %s' % args) + + with scp.SCPClient(ssh_client.get_transport()) as scp_client: + for local_file, remote_file in dags: + scp_client.put(local_file, remote_file) + scp_client.close() + + ssh_client.close() + + return False, "" + + except Exception as e: + raise AfctlDeploymentException(e) + diff --git a/afctl/tests/deployment_tests/test_scp_deployment.py b/afctl/tests/deployment_tests/test_scp_deployment.py new file mode 100644 index 0000000..71ba185 --- /dev/null +++ b/afctl/tests/deployment_tests/test_scp_deployment.py @@ -0,0 +1,99 @@ +from afctl.plugins.deployments.scp.deployment_config import ScpDeploymentConfig +from afctl.tests.utils import clean_up, PROJECT_NAME +import pytest +import os, pathlib, tempfile +import paramiko + +TMP = tempfile.gettempdir() +PROJECT_CONFIG_DIR=os.path.join(TMP, PROJECT_NAME, '.afctl_config') +PROJECT_CONFIG_FILE="{}.yml".format(os.path.join(PROJECT_CONFIG_DIR, PROJECT_NAME)) +HOME = os.path.expanduser('~') + + +class DummyArgParse: + + def __init__(self, type=None, host=None, dagdir=None, username=None, password=None, identity=None): + self.type = type + self.m = host + self.c = dagdir + self.u = username + self.e = password + self.i = identity + + +class TestSCPDeployment: + + @pytest.fixture(scope='function') + def create_project(self): + main_dir = os.path.join(TMP, PROJECT_NAME) + clean_up(main_dir) + os.makedirs(main_dir, exist_ok=True) + os.makedirs(PROJECT_CONFIG_DIR, exist_ok=True) + os.makedirs(os.path.join(main_dir, 'deployments'), exist_ok=True) + os.makedirs(os.path.join(main_dir, 'dags'), exist_ok=True) + public_keypath = os.path.join(HOME, '.ssh', 'id_rsa.pub') + private_keypath = os.path.join(HOME, '.ssh', 'id_rsa') + config_file_content = """--- +global: + airflow_version: + git: + origin: + access-token: +deployment: + qubole: + local: + compose: + remote: + host: localhost + dagdir: %s + username: %s + password: %s +""" % (os.path.join(main_dir, 'dags'), public_keypath, private_keypath) + + with open(PROJECT_CONFIG_FILE, 'w') as file: + file.write(config_file_content) + + #key = paramiko.RSAKey.generate(1024) + #key.write_private_key_file(private_keypath) + + #with open(public_keypath, "w") as public: + # public.write("%s %s" % (key.get_name(), key.get_base64())) + #public.close() + + yield main_dir + #clean_up(main_dir) + + def test_scp(self, create_project): + # wtf - Utility.project_config() futzes with path suffix ... + ScpDeploymentConfig.deploy_project({}, PROJECT_CONFIG_FILE.replace('.yml','')) + + def test_validate_configs_on_add_pass(self): + args = DummyArgParse(type='add', host='localhost', dagdir='/var/lib/airflow/dags') + val = ScpDeploymentConfig.validate_configs(args) + assert val[0] == {'remote': {'host': 'localhost', 'dagdir': '/var/lib/airflow/dags'}} + assert val[1] is False + assert val[2] == '' + + def test_validate_configs_on_add_fail(self): + args = DummyArgParse(type='add', host='localhost') + val = ScpDeploymentConfig.validate_configs(args) + assert val[0] == {} + assert val[1] is True + assert val[2] == 'Airflow DAG directory required. Check usage.' + + def test_validate_configs_on_update_pass(self): + args = DummyArgParse(type='update', host='localhost', dagdir='/var/lib/airflow/dags') + val = ScpDeploymentConfig.validate_configs(args) + assert val[0] == {'deployment': + {'remote': {'host': 'localhost', + 'dagdir': '/var/lib/airflow/dags'}}} + assert val[1] is False + assert val[2] == '' + + def test_validate_configs_on_update_fail(self): + args = DummyArgParse(type='update', host='localhost') + val = ScpDeploymentConfig.validate_configs(args) + assert val[0] == {} + assert val[1] is True + assert val[2] == 'Airflow DAG directory required. Check usage.' + diff --git a/afctl/tests/parser_tests/test_parser_utils.py b/afctl/tests/parser_tests/test_parser_utils.py index 469f441..ef2a65f 100644 --- a/afctl/tests/parser_tests/test_parser_utils.py +++ b/afctl/tests/parser_tests/test_parser_utils.py @@ -41,7 +41,7 @@ def test_create_files(self, clean_tmp_dir): assert dirs['three'] == SEP.join([TMP, 'three']) assert os.path.exists(dirs['three']) is True - # project_config + # project config dir def test_return_project_config_file(self): project = "test_project" expected_path = os.path.join(PROJECT_CONFIG_DIR, project)+".yml" @@ -97,7 +97,7 @@ def test_find_project(self, create_project): @pytest.fixture(scope='function') def create_config_file(self): - os.mkdir(PROJECT_CONFIG_DIR) + os.makedirs(PROJECT_CONFIG_DIR, exist_ok=True) file_path = os.path.join(PROJECT_CONFIG_DIR, PROJECT_NAME)+'.yml' pathlib.Path(file_path).touch() yml_template = """ diff --git a/afctl/tests/parser_tests/test_parsers.py b/afctl/tests/parser_tests/test_parsers.py index b9fc89e..6306f8c 100644 --- a/afctl/tests/parser_tests/test_parsers.py +++ b/afctl/tests/parser_tests/test_parsers.py @@ -46,8 +46,9 @@ def test_global_configs(self, create_parser): qubole: null local: compose: null + remote: null """ current_output = open(config_file).read() expected_output = expected_output.replace(" ", "") current_output = current_output.replace(" ", "") - assert expected_output == current_output \ No newline at end of file + assert expected_output == current_output diff --git a/afctl/tests/utils.py b/afctl/tests/utils.py index 3c70a70..11e00dd 100644 --- a/afctl/tests/utils.py +++ b/afctl/tests/utils.py @@ -1,10 +1,14 @@ import os import shutil import itertools +import tempfile PROJECT_NAME = 'test_project' -PROJECT_CONFIG_DIR = os.path.join(os.path.expanduser("~"), '.afctl_config') +PROJECT_CONFIG_DIR=os.path.join(tempfile.gettempdir(), PROJECT_NAME, '.afctl_config') +# monkey patch for testing (otherwise test suite zaps your .afctl_config!! +from afctl.utils import Utility +Utility.CONSTS['config_dir'] = PROJECT_CONFIG_DIR class DummyArgParse: