Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6.14.z] Pytest verifies and log-in into vault #12766

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,25 @@ jobs:

- name: Collect Tests
run: |
# To skip vault login in pull request checks
export VAULT_SECRET_ID_FOR_DYNACONF=somesecret
pytest --collect-only --disable-pytest-warnings tests/foreman/ tests/robottelo/
pytest --collect-only --disable-pytest-warnings -m pre_upgrade tests/upgrades/
pytest --collect-only --disable-pytest-warnings -m post_upgrade tests/upgrades/

- name: Collect Tests with xdist
run: |
# To skip vault login in pull request checks
export VAULT_SECRET_ID_FOR_DYNACONF=somesecret
pytest --collect-only --setup-plan --disable-pytest-warnings -n 2 tests/foreman/ tests/robottelo/
pytest --collect-only --setup-plan --disable-pytest-warnings -n 2 -m pre_upgrade tests/upgrades/
pytest --collect-only --setup-plan --disable-pytest-warnings -n 2 -m post_upgrade tests/upgrades/

- name: Run Robottelo's Tests
run: pytest -sv tests/robottelo/
run: |
# To skip vault login in pull request checks
export VAULT_SECRET_ID_FOR_DYNACONF=somesecret
pytest -sv tests/robottelo/

- name: Make Docs
run: |
Expand Down
1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pytest_plugins = [
# Plugins
'pytest_plugins.auto_vault',
'pytest_plugins.disable_rp_params',
'pytest_plugins.external_logging',
'pytest_plugins.fixture_markers',
Expand Down
10 changes: 10 additions & 0 deletions pytest_plugins/auto_vault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Plugin enables pytest to notify and update the requirements"""
import subprocess

from robottelo.utils.vault import Vault


def pytest_addoption(parser):
"""Options to allow user to update the requirements"""
with Vault() as vclient:
vclient.login(stdout=subprocess.PIPE, stderr=subprocess.PIPE)
22 changes: 0 additions & 22 deletions robottelo/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,12 @@
# General utility functions which does not fit into other util modules OR
# Independent utility functions that doesnt need separate module
import base64
import os
from pathlib import Path
import re

from cryptography.hazmat.backends import default_backend as crypto_default_backend
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa

from robottelo.constants import Colored
from robottelo.exceptions import InvalidVaultURLForOIDC


def export_vault_env_vars(filename=None, envdata=None):
if not envdata:
envdata = Path(filename or '.env').read_text()
vaulturl = re.findall('VAULT_URL_FOR_DYNACONF=(.*)', envdata)[0]

# Vault CLI Env Var
os.environ['VAULT_ADDR'] = vaulturl

# Dynaconf Vault Env Vars
if re.findall('VAULT_ENABLED_FOR_DYNACONF=(.*)', envdata)[0] == 'true':
if 'localhost:8200' in vaulturl:
raise InvalidVaultURLForOIDC(
f"{Colored.REDDARK}{vaulturl} doesnt supports OIDC login,"
"please change url to corp vault in env file!"
)


def gen_ssh_keypairs():
"""Generates private SSH key with its public key"""
Expand Down
114 changes: 114 additions & 0 deletions robottelo/utils/vault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Hashicorp Vault Utils where vault CLI is wrapped to perform vault operations"""
import json
import os
import re
import subprocess
import sys

from robottelo.exceptions import InvalidVaultURLForOIDC
from robottelo.logging import logger, robottelo_root_dir


class Vault:

HELP_TEXT = (
"Vault CLI in not installed in your system, "
"refer link https://learn.hashicorp.com/tutorials/vault/getting-started-install to "
"install vault CLI as per your system spec!"
)

def __init__(self, env_file='.env'):
self.env_path = robottelo_root_dir.joinpath(env_file)

def setup(self):
self.export_vault_addr()

def teardown(self):
del os.environ['VAULT_ADDR']

def export_vault_addr(self):
envdata = self.env_path.read_text()
vaulturl = re.findall('VAULT_URL_FOR_DYNACONF=(.*)', envdata)[0]

# Set Vault CLI Env Var
os.environ['VAULT_ADDR'] = vaulturl

# Dynaconf Vault Env Vars
if re.findall('VAULT_ENABLED_FOR_DYNACONF=(.*)', envdata)[0] == 'true':
if 'localhost:8200' in vaulturl:
raise InvalidVaultURLForOIDC(
f"{vaulturl} doesnt supports OIDC login,"
"please change url to corp vault in env file!"
)

def exec_vault_command(self, command: str, **kwargs):
"""A wrapper to execute the vault CLI commands

:param comamnd str: The vault CLI command
:param kwargs dict: Arguments to the subprocess run command to customize the run behavior
"""
vcommand = subprocess.run(command, shell=True, **kwargs) # capture_output=True
if vcommand.returncode != 0:
verror = str(vcommand.stderr)
if vcommand.returncode == 127:
logger.error(f"Error! {self.HELP_TEXT}")
sys.exit(1)
if vcommand.stderr:
if 'Error revoking token' in verror:
logger.info("Token is alredy revoked!")
elif 'Error looking up token' in verror:
logger.warning("Warning! Vault not logged in!")
else:
logger.error(f"Error! {verror}")
return vcommand

def login(self, **kwargs):
if 'VAULT_SECRET_ID_FOR_DYNACONF' not in os.environ:
if self.status(**kwargs).returncode != 0:
logger.warning(
"Warning! The browser is about to open for vault OIDC login, "
"close the tab once the sign-in is done!"
)
if (
self.exec_vault_command(command="vault login -method=oidc", **kwargs).returncode
== 0
):
self.exec_vault_command(command="vault token renew -i 10h", **kwargs)
logger.info("Success! Vault OIDC Logged-In and extended for 10 hours!")
# Fetching tokens
token = self.exec_vault_command(
"vault token lookup --format json", capture_output=True
).stdout
token = json.loads(str(token.decode('UTF-8')))['data']['id']
# Setting new token in env file
envdata = self.env_path.read_text()
envdata = re.sub(
'.*VAULT_TOKEN_FOR_DYNACONF=.*', f"VAULT_TOKEN_FOR_DYNACONF={token}", envdata
)
self.env_path.write_text(envdata)
logger.info(
"Success! New OIDC token added to .env file to access secrets from vault!"
)

def logout(self):
# Teardown - Setting dymmy token in env file
envdata = self.env_path.read_text()
envdata = re.sub(
'.*VAULT_TOKEN_FOR_DYNACONF=.*', "# VAULT_TOKEN_FOR_DYNACONF=myroot", envdata
)
self.env_path.write_text(envdata)
self.exec_vault_command('vault token revoke -self')
logger.info("Success! OIDC token removed from Env file successfully!")

def status(self, **kwargs):
vstatus = self.exec_vault_command('vault token lookup', **kwargs)
if vstatus.returncode == 0:
logger.info(str(vstatus.stdout.decode('UTF-8')))
return vstatus

def __enter__(self):
self.setup()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.teardown()
87 changes: 8 additions & 79 deletions scripts/vault_login.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,14 @@
#!/usr/bin/env python
# This Enables and Disables individuals OIDC token to access secrets from vault
import json
import os
from pathlib import Path
import re
import subprocess
import sys

from robottelo.constants import Colored
from robottelo.utils import export_vault_env_vars

HELP_TEXT = (
"Vault CLI in not installed in your system, "
"refer link https://learn.hashicorp.com/tutorials/vault/getting-started-install to "
"install vault CLI as per your system spec!"
)


def _vault_command(command: str):
vcommand = subprocess.run(command, capture_output=True, shell=True)
if vcommand.returncode != 0:
verror = str(vcommand.stderr)
if vcommand.returncode == 127:
print(f"{Colored.REDDARK}Error! {HELP_TEXT}")
sys.exit(1)
elif 'Error revoking token' in verror:
print(f"{Colored.GREEN}Token is alredy revoked!")
sys.exit(0)
elif 'Error looking up token' in verror:
print(f"{Colored.YELLOW}Warning! Vault not logged in, please run 'make vault-login'!")
sys.exit(2)
else:
print(f"{Colored.REDDARK}Error! {verror}")
sys.exit(1)
return vcommand


def _vault_login(root_path, envdata):
print(
f"{Colored.WHITELIGHT}Warning! The browser is about to open for vault OIDC login, "
"close the tab once the sign-in is done!"
)
if _vault_command(command="vault login -method=oidc").returncode == 0:
_vault_command(command="vault token renew -i 10h")
print(f"{Colored.GREEN}Success! Vault OIDC Logged-In and extended for 10 hours!")
# Fetching token
token = _vault_command("vault token lookup --format json").stdout
token = json.loads(str(token.decode('UTF-8')))['data']['id']
# Setting new token in env file
envdata = re.sub('.*VAULT_TOKEN_FOR_DYNACONF=.*', f"VAULT_TOKEN_FOR_DYNACONF={token}", envdata)
with open(root_path, 'w') as envfile:
envfile.write(envdata)
print(
f"{Colored.GREEN}Success! New OIDC token added to .env file to access secrets from vault!"
)


def _vault_logout(root_path, envdata):
# Teardown - Setting dymmy token in env file
envdata = re.sub('.*VAULT_TOKEN_FOR_DYNACONF=.*', "# VAULT_TOKEN_FOR_DYNACONF=myroot", envdata)
with open(root_path, 'w') as envfile:
envfile.write(envdata)
_vault_command('vault token revoke -self')
print(f"{Colored.GREEN}Success! OIDC token removed from Env file successfully!")


def _vault_status():
vstatus = _vault_command('vault token lookup')
if vstatus.returncode == 0:
print(str(vstatus.stdout.decode('UTF-8')))

from robottelo.utils.vault import Vault

if __name__ == '__main__':
root_path = Path('.env')
envdata = root_path.read_text()
export_vault_env_vars(envdata=envdata)
if sys.argv[-1] == '--login':
_vault_login(root_path, envdata)
elif sys.argv[-1] == '--status':
_vault_status()
else:
_vault_logout(root_path, envdata)
# Unsetting VAULT URL
del os.environ['VAULT_ADDR']
with Vault() as vclient:
if sys.argv[-1] == '--login':
vclient.login()
elif sys.argv[-1] == '--status':
vclient.status()
else:
vclient.logout()
Loading