Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor improvements Part 01 #1924

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED]

### Changed

- Modified the executor core to change the file path for terraform state files to store and read from **.config/executor_plugins** folder
- Clean up any half-done/dirty deployed resources post **deploy up**

### Operations

- Fixed nightly workflow's calling of other workflows.
Expand Down Expand Up @@ -349,7 +354,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Operations

- Respecting node version as specified in `.nvmrc` file for tests workflow
- Respecting node version as specified in `.nvmrc` file for tests workflow
- Bumped versions in pre-commit config
- Added prettier for markdown files.
- Reduce the number of pinned version numbers in the `setup.py`, `requirements.txt`, and `requirements-client.txt`
Expand Down Expand Up @@ -5260,4 +5265,4 @@ Installed executor plugins don't have to be referred to by their full module nam

- CHANGELOG.md to track changes (this file).
- Semantic versioning in VERSION.
- CI pipeline job to enforce versioning.
- CI pipeline job to enforce versioning.
76 changes: 58 additions & 18 deletions covalent/cloud_resource_manager/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Union

from covalent._shared_files.config import set_config
from covalent._shared_files.defaults import get_default_sdk_config
from covalent.executor import _executor_manager

logger = logging.getLogger()
logger.setLevel(logging.ERROR)
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.propagate = False
sdk_constants = get_default_sdk_config()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use get_config instead of directly importing the defaults since they might've been changed by the user in the config file directly. Using get_config with specifically what is needed (instead of the entire sdk config) will take that into consideration as well.



def get_executor_module(executor_name: str) -> ModuleType:
Expand Down Expand Up @@ -140,6 +142,7 @@ def __init__(
options: Optional[Dict[str, str]] = None,
):
self.executor_name = executor_name
self.executor_tfstate_path = self._get_tf_state_path(sdk_constants["executor_dir"])
self.executor_tf_path = str(
Path(executor_module_path).expanduser().resolve() / "assets" / "infra"
)
Expand All @@ -163,7 +166,7 @@ def __init__(

self._terraform_log_env_vars = {
"TF_LOG": "ERROR",
"TF_LOG_PATH": os.path.join(self.executor_tf_path, "terraform-error.log"),
"TF_LOG_PATH": os.path.join(self.executor_tfstate_path, "terraform-error.log"),
}

def _poll_process(self, process: subprocess.Popen, print_callback: Callable) -> int:
Expand All @@ -190,7 +193,9 @@ def _parse_terraform_error_log(self) -> List[str]:
List of lines in the terraform error log.

"""
with open(Path(self.executor_tf_path) / "terraform-error.log", "r", encoding="UTF-8") as f:
with open(
Path(self.executor_tfstate_path) / "terraform-error.log", "r", encoding="UTF-8"
) as f:
lines = f.readlines()
for _, line in enumerate(lines):
error_index = line.strip().find("error:")
Expand All @@ -199,7 +204,7 @@ def _parse_terraform_error_log(self) -> List[str]:
logger.error(error_message)
return lines

def _terraform_error_validator(self, tfstate_path: str) -> bool:
def _terraform_error_validator(self, tfstate_path: str) -> str:
"""
Terraform error validator checks whether any terraform-error.log files existence and validate last line.
Args: None
Expand All @@ -209,13 +214,13 @@ def _terraform_error_validator(self, tfstate_path: str) -> bool:
down - if terraform-error.log is empty and tfstate file not exists.
*down - if terraform-error.log is not empty and 'On destroy' at last line.
"""
tf_error_file = os.path.join(self.executor_tf_path, "terraform-error.log")
tf_error_file = os.path.join(self.executor_tfstate_path, "terraform-error.log")
if os.path.exists(tf_error_file) and os.path.getsize(tf_error_file) > 0:
with open(tf_error_file, "r", encoding="UTF-8") as error_file:
indicator = error_file.readlines()[-1]
if indicator == "On deploy":
return "*up"
elif indicator == "On destroy":
if indicator == "On destroy":
return "*down"
return "up" if os.path.exists(tfstate_path) else "down"

Expand Down Expand Up @@ -253,7 +258,7 @@ def _log_error_msg(self, cmd) -> None:
Args: cmd: str - terraform-error.log file path.
"""
with open(
Path(self.executor_tf_path) / "terraform-error.log", "a", encoding="UTF-8"
Path(self.executor_tfstate_path) / "terraform-error.log", "a", encoding="UTF-8"
) as file:
if any(tf_cmd in cmd for tf_cmd in ["init", "plan", "apply"]):
file.write("\nOn deploy")
Expand Down Expand Up @@ -305,6 +310,9 @@ def _run_in_subprocess(

if returncode != 0:
self._log_error_msg(cmd=cmd)
if "terraform apply" in cmd:
logger.error("Deployment interrupted. Rolling back the deployed resources...")
self.down(print_callback=print_callback)

_, stderr = proc.communicate()
raise subprocess.CalledProcessError(
Expand Down Expand Up @@ -389,7 +397,24 @@ def _get_tf_statefile_path(self) -> str:

"""
# Saving in a directory which doesn't get deleted on purge
return str(Path(self.executor_tf_path) / "terraform.tfstate")
return str(Path(self.executor_tfstate_path) / "terraform.tfstate")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saving in a directory which doesn't get deleted on purge

self.executor_tfstate_path seems to point to ~/.config/covalent/executor_plugins/*, this WILL get deleted by purge. We need to add an exception in the purge command to prevent deletion of these state files.


def _get_tf_state_path(self, parent_path) -> str:
"""
Get the terraform state file path

Args:
None

Returns:
Path to terraform state file

"""
# Saving in a directory which doesn't get deleted on purge
state_path = os.path.join(parent_path, self.executor_name)
if not os.path.exists(state_path):
os.makedirs(state_path)
Comment on lines +413 to +416
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above. One minor thing is that if possible let's try to use pathlib's Path instead of os.path to join/checking existence/checking whether its a directory/etc.

return state_path

def up(self, print_callback: Callable, dry_run: bool = True) -> None:
"""
Expand All @@ -404,12 +429,29 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None:
"""
terraform = self._get_tf_path()
self._validation_docker()
tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars"
tfvars_file = Path(self.executor_tfstate_path) / "terraform.tfvars"
tf_executor_config_file = Path(self.executor_tf_path) / f"{self.executor_name}.conf"

tf_init = " ".join([terraform, "init"])
tf_plan = " ".join([terraform, "plan", "-out", "tf.plan"])
tf_apply = " ".join([terraform, "apply", "tf.plan"])
tf_plan = " ".join(
[
terraform,
"plan",
"--var-file",
f"{tfvars_file}",
"-out",
f"{self.executor_tfstate_path}/tf.plan",
]
)
tf_apply = " ".join(
[
terraform,
"apply",
"-state-out",
self._get_tf_statefile_path(),
f"{self.executor_tfstate_path}/tf.plan",
]
)
terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"]

if Path(terraform_log_file).exists():
Expand All @@ -431,7 +473,6 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None:
for key, value in infra_settings.items():
if "default" in value:
tf_vars_env_dict[f"TF_VAR_{key}"] = value["default"]

if value["default"]:
f.write(f'{key}={self._convert_to_tfvar(value["default"])}\n')

Expand Down Expand Up @@ -480,7 +521,7 @@ def down(self, print_callback: Callable) -> None:
terraform = self._get_tf_path()
self._validation_docker()
tf_state_file = self._get_tf_statefile_path()
tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars"
tfvars_file = Path(self.executor_tfstate_path) / "terraform.tfvars"
terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"]

tf_destroy = " ".join(
Expand All @@ -490,6 +531,8 @@ def down(self, print_callback: Callable) -> None:
f"TF_LOG_PATH={terraform_log_file}",
terraform,
"destroy",
"--state",
self._get_tf_statefile_path(),
"-auto-approve",
]
)
Expand All @@ -506,9 +549,6 @@ def down(self, print_callback: Callable) -> None:
if Path(tfvars_file).exists():
Path(tfvars_file).unlink()

if Path(terraform_log_file).exists() and os.path.getsize(terraform_log_file) == 0:
Path(terraform_log_file).unlink()

if Path(tf_state_file).exists():
Path(tf_state_file).unlink()
if Path(f"{tf_state_file}.backup").exists():
Expand All @@ -531,9 +571,9 @@ def status(self) -> None:
"""
terraform = self._get_tf_path()
self._validation_docker()
tf_state_file = self._get_tf_statefile_path()

tf_state = " ".join([terraform, "state", "list", f"-state={tf_state_file}"])
tf_state = " ".join(
[terraform, "state", "list", f"-state={self._get_tf_statefile_path()}"]
)

# Run `terraform state list`
return self._run_in_subprocess(cmd=tf_state, env_vars=self._terraform_log_env_vars)
Expand Down
Loading