forked from SatelliteQE/robottelo
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Pytest verifies and log-in into vault (SatelliteQE#12378)
* Pytest verifies and logs in into vault * New vault lib utils for both * PR checks without vault login
- Loading branch information
Showing
6 changed files
with
141 additions
and
103 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
"""Plugin enables pytest to notify and update the requirements""" | ||
import subprocess | ||
|
||
from robottelo.utils.vault import Vault | ||
|
||
|
||
def pytest_addoption(parser): | ||
"""Options to allow user to update the requirements""" | ||
with Vault() as vclient: | ||
vclient.login(stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
"""Hashicorp Vault Utils where vault CLI is wrapped to perform vault operations""" | ||
import json | ||
import os | ||
import re | ||
import subprocess | ||
import sys | ||
|
||
from robottelo.exceptions import InvalidVaultURLForOIDC | ||
from robottelo.logging import logger, robottelo_root_dir | ||
|
||
|
||
class Vault: | ||
|
||
HELP_TEXT = ( | ||
"Vault CLI in not installed in your system, " | ||
"refer link https://learn.hashicorp.com/tutorials/vault/getting-started-install to " | ||
"install vault CLI as per your system spec!" | ||
) | ||
|
||
def __init__(self, env_file='.env'): | ||
self.env_path = robottelo_root_dir.joinpath(env_file) | ||
|
||
def setup(self): | ||
self.export_vault_addr() | ||
|
||
def teardown(self): | ||
del os.environ['VAULT_ADDR'] | ||
|
||
def export_vault_addr(self): | ||
envdata = self.env_path.read_text() | ||
vaulturl = re.findall('VAULT_URL_FOR_DYNACONF=(.*)', envdata)[0] | ||
|
||
# Set Vault CLI Env Var | ||
os.environ['VAULT_ADDR'] = vaulturl | ||
|
||
# Dynaconf Vault Env Vars | ||
if re.findall('VAULT_ENABLED_FOR_DYNACONF=(.*)', envdata)[0] == 'true': | ||
if 'localhost:8200' in vaulturl: | ||
raise InvalidVaultURLForOIDC( | ||
f"{vaulturl} doesnt supports OIDC login," | ||
"please change url to corp vault in env file!" | ||
) | ||
|
||
def exec_vault_command(self, command: str, **kwargs): | ||
"""A wrapper to execute the vault CLI commands | ||
:param comamnd str: The vault CLI command | ||
:param kwargs dict: Arguments to the subprocess run command to customize the run behavior | ||
""" | ||
vcommand = subprocess.run(command, shell=True, **kwargs) # capture_output=True | ||
if vcommand.returncode != 0: | ||
verror = str(vcommand.stderr) | ||
if vcommand.returncode == 127: | ||
logger.error(f"Error! {self.HELP_TEXT}") | ||
sys.exit(1) | ||
if vcommand.stderr: | ||
if 'Error revoking token' in verror: | ||
logger.info("Token is alredy revoked!") | ||
elif 'Error looking up token' in verror: | ||
logger.warning("Warning! Vault not logged in!") | ||
else: | ||
logger.error(f"Error! {verror}") | ||
return vcommand | ||
|
||
def login(self, **kwargs): | ||
if 'VAULT_SECRET_ID_FOR_DYNACONF' not in os.environ: | ||
if self.status(**kwargs).returncode != 0: | ||
logger.warning( | ||
"Warning! The browser is about to open for vault OIDC login, " | ||
"close the tab once the sign-in is done!" | ||
) | ||
if ( | ||
self.exec_vault_command(command="vault login -method=oidc", **kwargs).returncode | ||
== 0 | ||
): | ||
self.exec_vault_command(command="vault token renew -i 10h", **kwargs) | ||
logger.info("Success! Vault OIDC Logged-In and extended for 10 hours!") | ||
# Fetching tokens | ||
token = self.exec_vault_command( | ||
"vault token lookup --format json", capture_output=True | ||
).stdout | ||
token = json.loads(str(token.decode('UTF-8')))['data']['id'] | ||
# Setting new token in env file | ||
envdata = self.env_path.read_text() | ||
envdata = re.sub( | ||
'.*VAULT_TOKEN_FOR_DYNACONF=.*', f"VAULT_TOKEN_FOR_DYNACONF={token}", envdata | ||
) | ||
self.env_path.write_text(envdata) | ||
logger.info( | ||
"Success! New OIDC token added to .env file to access secrets from vault!" | ||
) | ||
|
||
def logout(self): | ||
# Teardown - Setting dymmy token in env file | ||
envdata = self.env_path.read_text() | ||
envdata = re.sub( | ||
'.*VAULT_TOKEN_FOR_DYNACONF=.*', "# VAULT_TOKEN_FOR_DYNACONF=myroot", envdata | ||
) | ||
self.env_path.write_text(envdata) | ||
self.exec_vault_command('vault token revoke -self') | ||
logger.info("Success! OIDC token removed from Env file successfully!") | ||
|
||
def status(self, **kwargs): | ||
vstatus = self.exec_vault_command('vault token lookup', **kwargs) | ||
if vstatus.returncode == 0: | ||
logger.info(str(vstatus.stdout.decode('UTF-8'))) | ||
return vstatus | ||
|
||
def __enter__(self): | ||
self.setup() | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.teardown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,86 +1,14 @@ | ||
#!/usr/bin/env python | ||
# This Enables and Disables individuals OIDC token to access secrets from vault | ||
import json | ||
import os | ||
import re | ||
import subprocess | ||
import sys | ||
from pathlib import Path | ||
|
||
from robottelo.constants import Colored | ||
from robottelo.utils import export_vault_env_vars | ||
|
||
|
||
HELP_TEXT = ( | ||
"Vault CLI in not installed in your system, " | ||
"refer link https://learn.hashicorp.com/tutorials/vault/getting-started-install to " | ||
"install vault CLI as per your system spec!" | ||
) | ||
|
||
|
||
def _vault_command(command: str): | ||
vcommand = subprocess.run(command, capture_output=True, shell=True) | ||
if vcommand.returncode != 0: | ||
verror = str(vcommand.stderr) | ||
if vcommand.returncode == 127: | ||
print(f"{Colored.REDDARK}Error! {HELP_TEXT}") | ||
sys.exit(1) | ||
elif 'Error revoking token' in verror: | ||
print(f"{Colored.GREEN}Token is alredy revoked!") | ||
sys.exit(0) | ||
elif 'Error looking up token' in verror: | ||
print(f"{Colored.YELLOW}Warning! Vault not logged in, please run 'make vault-login'!") | ||
sys.exit(2) | ||
else: | ||
print(f"{Colored.REDDARK}Error! {verror}") | ||
sys.exit(1) | ||
return vcommand | ||
|
||
|
||
def _vault_login(root_path, envdata): | ||
print( | ||
f"{Colored.WHITELIGHT}Warning! The browser is about to open for vault OIDC login, " | ||
"close the tab once the sign-in is done!" | ||
) | ||
if _vault_command(command="vault login -method=oidc").returncode == 0: | ||
_vault_command(command="vault token renew -i 10h") | ||
print(f"{Colored.GREEN}Success! Vault OIDC Logged-In and extended for 10 hours!") | ||
# Fetching token | ||
token = _vault_command("vault token lookup --format json").stdout | ||
token = json.loads(str(token.decode('UTF-8')))['data']['id'] | ||
# Setting new token in env file | ||
envdata = re.sub('.*VAULT_TOKEN_FOR_DYNACONF=.*', f"VAULT_TOKEN_FOR_DYNACONF={token}", envdata) | ||
with open(root_path, 'w') as envfile: | ||
envfile.write(envdata) | ||
print( | ||
f"{Colored.GREEN}Success! New OIDC token added to .env file to access secrets from vault!" | ||
) | ||
|
||
|
||
def _vault_logout(root_path, envdata): | ||
# Teardown - Setting dymmy token in env file | ||
envdata = re.sub('.*VAULT_TOKEN_FOR_DYNACONF=.*', "# VAULT_TOKEN_FOR_DYNACONF=myroot", envdata) | ||
with open(root_path, 'w') as envfile: | ||
envfile.write(envdata) | ||
_vault_command('vault token revoke -self') | ||
print(f"{Colored.GREEN}Success! OIDC token removed from Env file successfully!") | ||
|
||
|
||
def _vault_status(): | ||
vstatus = _vault_command('vault token lookup') | ||
if vstatus.returncode == 0: | ||
print(str(vstatus.stdout.decode('UTF-8'))) | ||
|
||
from robottelo.utils.vault import Vault | ||
|
||
if __name__ == '__main__': | ||
root_path = Path('.env') | ||
envdata = root_path.read_text() | ||
export_vault_env_vars(envdata=envdata) | ||
if sys.argv[-1] == '--login': | ||
_vault_login(root_path, envdata) | ||
elif sys.argv[-1] == '--status': | ||
_vault_status() | ||
else: | ||
_vault_logout(root_path, envdata) | ||
# Unsetting VAULT URL | ||
del os.environ['VAULT_ADDR'] | ||
with Vault() as vclient: | ||
if sys.argv[-1] == '--login': | ||
vclient.login() | ||
elif sys.argv[-1] == '--status': | ||
vclient.status() | ||
else: | ||
vclient.logout() |