Skip to content

Commit

Permalink
consolidate the logs of python script into container logs
Browse files Browse the repository at this point in the history
Signed-off-by: Harshad Reddy Nalla <[email protected]>
  • Loading branch information
harshad16 committed Jun 21, 2024
1 parent 8461fce commit 768e2c7
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 104 deletions.
50 changes: 37 additions & 13 deletions runtimes/datascience/ubi8-python-3.8/utils/bootstrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copied from: https://github.com/elyra-ai/elyra/blob/main/elyra/kfp/bootstrapper.py
#
#
# Copyright 2018-2023 Elyra Authors
#
Expand Down Expand Up @@ -472,7 +472,7 @@ def execute(self) -> None:
"""Execute the Python script and upload results to object storage"""
python_script = os.path.basename(self.filepath)
python_script_name = python_script.replace(".py", "")
python_script_output = f"{python_script_name}.log"
# python_script_output = f"{python_script_name}.log"

try:
OpUtil.log_operation_info(
Expand All @@ -483,21 +483,32 @@ def execute(self) -> None:
run_args = ["python3", python_script]
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(python_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)


logger.info("----------------------Python logs start----------------------")
# Removing support for the s3 storage of python script logs
# with open(python_script_output, "w") as log_file:
# process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode())

process.stdout.close()
return_code = process.wait()
logger.info("----------------------Python logs ends----------------------")
if return_code:
raise subprocess.CalledProcessError(return_code, run_args)
duration = time.time() - t0
OpUtil.log_operation_info("python script execution completed", duration)

self.put_file_to_object_storage(python_script_output, python_script_output)
# self.put_file_to_object_storage(python_script_output, python_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(python_script_output, python_script_output)
# self.put_file_to_object_storage(python_script_output, python_script_output)
raise ex


Expand All @@ -508,7 +519,7 @@ def execute(self) -> None:
"""Execute the R script and upload results to object storage"""
r_script = os.path.basename(self.filepath)
r_script_name = r_script.replace(".r", "")
r_script_output = f"{r_script_name}.log"
# r_script_output = f"{r_script_name}.log"

try:
OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'")
Expand All @@ -518,20 +529,32 @@ def execute(self) -> None:
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(r_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)
logger.info("----------------------R script logs start----------------------")
# Removing support for the s3 storage of R script logs
# with open(r_script_output, "w") as log_file:
# process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode())

process.stdout.close()
return_code = process.wait()
logger.info("----------------------R script logs ends----------------------")
if return_code:
raise subprocess.CalledProcessError(return_code, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("R script execution completed", duration)

self.put_file_to_object_storage(r_script_output, r_script_output)
# self.put_file_to_object_storage(r_script_output, r_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(r_script_output, r_script_output)
# self.put_file_to_object_storage(r_script_output, r_script_output)
raise ex


Expand Down Expand Up @@ -727,6 +750,7 @@ def main():
input_params = OpUtil.parse_arguments(sys.argv[1:])
OpUtil.log_operation_info("starting operation")
t0 = time.time()
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))

# Create the appropriate instance, process dependencies and execute the operation
file_op = FileOpBase.get_instance(**input_params)
Expand Down
50 changes: 37 additions & 13 deletions runtimes/datascience/ubi9-python-3.9/utils/bootstrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copied from: https://github.com/elyra-ai/elyra/blob/main/elyra/kfp/bootstrapper.py
#
#
# Copyright 2018-2023 Elyra Authors
#
Expand Down Expand Up @@ -472,7 +472,7 @@ def execute(self) -> None:
"""Execute the Python script and upload results to object storage"""
python_script = os.path.basename(self.filepath)
python_script_name = python_script.replace(".py", "")
python_script_output = f"{python_script_name}.log"
# python_script_output = f"{python_script_name}.log"

try:
OpUtil.log_operation_info(
Expand All @@ -483,21 +483,32 @@ def execute(self) -> None:
run_args = ["python3", python_script]
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(python_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)


logger.info("----------------------Python logs start----------------------")
# Removing support for the s3 storage of python script logs
# with open(python_script_output, "w") as log_file:
# process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode())

process.stdout.close()
return_code = process.wait()
logger.info("----------------------Python logs ends----------------------")
if return_code:
raise subprocess.CalledProcessError(return_code, run_args)
duration = time.time() - t0
OpUtil.log_operation_info("python script execution completed", duration)

self.put_file_to_object_storage(python_script_output, python_script_output)
# self.put_file_to_object_storage(python_script_output, python_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(python_script_output, python_script_output)
# self.put_file_to_object_storage(python_script_output, python_script_output)
raise ex


Expand All @@ -508,7 +519,7 @@ def execute(self) -> None:
"""Execute the R script and upload results to object storage"""
r_script = os.path.basename(self.filepath)
r_script_name = r_script.replace(".r", "")
r_script_output = f"{r_script_name}.log"
# r_script_output = f"{r_script_name}.log"

try:
OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'")
Expand All @@ -518,20 +529,32 @@ def execute(self) -> None:
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(r_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)
logger.info("----------------------R script logs start----------------------")
# Removing support for the s3 storage of R script logs
# with open(r_script_output, "w") as log_file:
# process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode())

process.stdout.close()
return_code = process.wait()
logger.info("----------------------R script logs ends----------------------")
if return_code:
raise subprocess.CalledProcessError(return_code, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("R script execution completed", duration)

self.put_file_to_object_storage(r_script_output, r_script_output)
# self.put_file_to_object_storage(r_script_output, r_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(r_script_output, r_script_output)
# self.put_file_to_object_storage(r_script_output, r_script_output)
raise ex


Expand Down Expand Up @@ -727,6 +750,7 @@ def main():
input_params = OpUtil.parse_arguments(sys.argv[1:])
OpUtil.log_operation_info("starting operation")
t0 = time.time()
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))

# Create the appropriate instance, process dependencies and execute the operation
file_op = FileOpBase.get_instance(**input_params)
Expand Down
50 changes: 37 additions & 13 deletions runtimes/minimal/ubi8-python-3.8/utils/bootstrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copied from: https://github.com/elyra-ai/elyra/blob/main/elyra/kfp/bootstrapper.py
#
#
# Copyright 2018-2023 Elyra Authors
#
Expand Down Expand Up @@ -472,7 +472,7 @@ def execute(self) -> None:
"""Execute the Python script and upload results to object storage"""
python_script = os.path.basename(self.filepath)
python_script_name = python_script.replace(".py", "")
python_script_output = f"{python_script_name}.log"
# python_script_output = f"{python_script_name}.log"

try:
OpUtil.log_operation_info(
Expand All @@ -483,21 +483,32 @@ def execute(self) -> None:
run_args = ["python3", python_script]
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(python_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)


logger.info("----------------------Python logs start----------------------")
# Removing support for the s3 storage of python script logs
# with open(python_script_output, "w") as log_file:
# process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode())

process.stdout.close()
return_code = process.wait()
logger.info("----------------------Python logs ends----------------------")
if return_code:
raise subprocess.CalledProcessError(return_code, run_args)
duration = time.time() - t0
OpUtil.log_operation_info("python script execution completed", duration)

self.put_file_to_object_storage(python_script_output, python_script_output)
# self.put_file_to_object_storage(python_script_output, python_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(python_script_output, python_script_output)
# self.put_file_to_object_storage(python_script_output, python_script_output)
raise ex


Expand All @@ -508,7 +519,7 @@ def execute(self) -> None:
"""Execute the R script and upload results to object storage"""
r_script = os.path.basename(self.filepath)
r_script_name = r_script.replace(".r", "")
r_script_output = f"{r_script_name}.log"
# r_script_output = f"{r_script_name}.log"

try:
OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'")
Expand All @@ -518,20 +529,32 @@ def execute(self) -> None:
if self.parameter_pass_method == "env":
self.set_parameters_in_env()

with open(r_script_output, "w") as log_file:
subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True)
logger.info("----------------------R script logs start----------------------")
# Removing support for the s3 storage of R script logs
# with open(r_script_output, "w") as log_file:
# process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
process = subprocess.Popen(run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode())

process.stdout.close()
return_code = process.wait()
logger.info("----------------------R script logs ends----------------------")
if return_code:
raise subprocess.CalledProcessError(return_code, run_args)

duration = time.time() - t0
OpUtil.log_operation_info("R script execution completed", duration)

self.put_file_to_object_storage(r_script_output, r_script_output)
# self.put_file_to_object_storage(r_script_output, r_script_output)
self.process_outputs()
except Exception as ex:
# log in case of errors
logger.error(f"Unexpected error: {sys.exc_info()[0]}")
logger.error(f"Error details: {ex}")

self.put_file_to_object_storage(r_script_output, r_script_output)
# self.put_file_to_object_storage(r_script_output, r_script_output)
raise ex


Expand Down Expand Up @@ -727,6 +750,7 @@ def main():
input_params = OpUtil.parse_arguments(sys.argv[1:])
OpUtil.log_operation_info("starting operation")
t0 = time.time()
OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))

# Create the appropriate instance, process dependencies and execute the operation
file_op = FileOpBase.get_instance(**input_params)
Expand Down
Loading

0 comments on commit 768e2c7

Please sign in to comment.