From 32002377973ea3bf0bb7bfc174cec0abbeec5d8a Mon Sep 17 00:00:00 2001 From: Harshad Reddy Nalla Date: Mon, 17 Jun 2024 19:38:56 -0400 Subject: [PATCH] consolidate the logs of python script into container logs Signed-off-by: Harshad Reddy Nalla --- .../ubi8-python-3.8/utils/bootstrapper.py | 131 ++++++++++++------ .../ubi9-python-3.9/utils/bootstrapper.py | 131 ++++++++++++------ .../ubi8-python-3.8/utils/bootstrapper.py | 131 ++++++++++++------ .../ubi9-python-3.9/utils/bootstrapper.py | 131 ++++++++++++------ .../ubi8-python-3.8/utils/bootstrapper.py | 131 ++++++++++++------ .../ubi9-python-3.9/utils/bootstrapper.py | 131 ++++++++++++------ .../ubi8-python-3.8/utils/bootstrapper.py | 131 ++++++++++++------ .../ubi9-python-3.9/utils/bootstrapper.py | 131 ++++++++++++------ 8 files changed, 720 insertions(+), 328 deletions(-) diff --git a/runtimes/datascience/ubi8-python-3.8/utils/bootstrapper.py b/runtimes/datascience/ubi8-python-3.8/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/datascience/ubi8-python-3.8/utils/bootstrapper.py +++ b/runtimes/datascience/ubi8-python-3.8/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/runtimes/datascience/ubi9-python-3.9/utils/bootstrapper.py b/runtimes/datascience/ubi9-python-3.9/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/datascience/ubi9-python-3.9/utils/bootstrapper.py +++ b/runtimes/datascience/ubi9-python-3.9/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/runtimes/minimal/ubi8-python-3.8/utils/bootstrapper.py b/runtimes/minimal/ubi8-python-3.8/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/minimal/ubi8-python-3.8/utils/bootstrapper.py +++ b/runtimes/minimal/ubi8-python-3.8/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/runtimes/minimal/ubi9-python-3.9/utils/bootstrapper.py b/runtimes/minimal/ubi9-python-3.9/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/minimal/ubi9-python-3.9/utils/bootstrapper.py +++ b/runtimes/minimal/ubi9-python-3.9/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/runtimes/pytorch/ubi8-python-3.8/utils/bootstrapper.py b/runtimes/pytorch/ubi8-python-3.8/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/pytorch/ubi8-python-3.8/utils/bootstrapper.py +++ b/runtimes/pytorch/ubi8-python-3.8/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/runtimes/pytorch/ubi9-python-3.9/utils/bootstrapper.py b/runtimes/pytorch/ubi9-python-3.9/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/pytorch/ubi9-python-3.9/utils/bootstrapper.py +++ b/runtimes/pytorch/ubi9-python-3.9/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/runtimes/tensorflow/ubi8-python-3.8/utils/bootstrapper.py b/runtimes/tensorflow/ubi8-python-3.8/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/tensorflow/ubi8-python-3.8/utils/bootstrapper.py +++ b/runtimes/tensorflow/ubi8-python-3.8/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/runtimes/tensorflow/ubi9-python-3.9/utils/bootstrapper.py b/runtimes/tensorflow/ubi9-python-3.9/utils/bootstrapper.py index 723e9b975..8a053a129 100644 --- a/runtimes/tensorflow/ubi9-python-3.9/utils/bootstrapper.py +++ b/runtimes/tensorflow/ubi9-python-3.9/utils/bootstrapper.py @@ -46,7 +46,8 @@ F = TypeVar("F", bound="FileOpBase") logger = logging.getLogger("elyra") -enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" +enable_pipeline_info = os.getenv( + "ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true" pipeline_name = None # global used in formatted logging operation_name = None # global used in formatted logging @@ -81,8 +82,10 @@ def __init__(self, **kwargs: Any) -> None: self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") - self.parameter_pass_method = self.input_params.get("parameter_pass_method") - self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) + self.parameter_pass_method = self.input_params.get( + "parameter_pass_method") + self.pipeline_param_dict = self.convert_param_str_to_dict( + self.input_params.get("pipeline_parameters")) # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -104,12 +107,14 @@ def __init__(self, **kwargs: Any) -> None: ) # get minio client - self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) + self.cos_client = minio.Minio( + self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider) @abstractmethod def execute(self) -> None: """Execute the operation relative to derived class""" - raise NotImplementedError("Method 'execute()' must be implemented by subclasses!") + raise NotImplementedError( + "Method 'execute()' must be implemented by subclasses!") def process_dependencies(self) -> None: """Process dependencies @@ -180,7 +185,8 @@ def process_metrics_and_metadata(self) -> None: # output_path doesn't meet the requirements # treat this as a non-fatal error and log a warning logger.warning(f'Cannot create files in "{output_path}".') - OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0) + OpUtil.log_operation_info( + "Aborted metrics and metadata processing", time.time() - t0) return # Name of the proprietary KFP UI metadata file. @@ -225,11 +231,13 @@ def process_metrics_and_metadata(self) -> None: except ValueError as ve: # The file content could not be parsed. Log a warning # and treat this as a non-fatal error. - logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") + logger.warning( + f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}") except Exception as ex: # Something is wrong with the user-generated metadata file. # Log a warning and treat this as a non-fatal error. - logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") + logger.warning( + f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}") # # Augment kfp_ui_metadata_filename with Elyra-specific information: @@ -251,7 +259,8 @@ def process_metrics_and_metadata(self) -> None: # Define HREF for COS bucket: # // bucket_url = urljoin( - urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" + urlunparse( + self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/" ) # add Elyra metadata to 'outputs' @@ -292,7 +301,8 @@ def get_file_from_object_storage(self, file_to_get: str) -> None: object_to_get = self.get_object_storage_filename(file_to_get) t0 = time.time() - self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) + self.cos_client.fget_object( + bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get) duration = time.time() - t0 OpUtil.log_operation_info( f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration @@ -311,7 +321,8 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() - self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) + self.cos_client.fput_object( + bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) duration = time.time() - t0 OpUtil.log_operation_info( f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration @@ -366,7 +377,8 @@ def execute(self) -> None: notebook_html = f"{notebook_name}.html" try: - OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'") + OpUtil.log_operation_info( + f"executing notebook using 'papermill {notebook} {notebook_output}'") t0 = time.time() # Include kernel selection in execution time kernel_name = NotebookFileOp.find_best_kernel(notebook) @@ -377,11 +389,13 @@ def execute(self) -> None: import papermill - papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name, **kwargs) + papermill.execute_notebook( + notebook, notebook_output, kernel_name=kernel_name, **kwargs) duration = time.time() - t0 OpUtil.log_operation_info("notebook execution completed", duration) - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) self.process_outputs() @@ -389,7 +403,8 @@ def execute(self) -> None: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") - NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html) + NotebookFileOp.convert_notebook_to_html( + notebook_output, notebook_html) self.put_file_to_object_storage(notebook_output, notebook) self.put_file_to_object_storage(notebook_html) raise ex @@ -405,7 +420,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: import nbconvert import nbformat - OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}") + OpUtil.log_operation_info( + f"converting from {notebook_file} to {html_file}") t0 = time.time() nb = nbformat.read(notebook_file, as_version=4) html_exporter = nbconvert.HTMLExporter() @@ -415,7 +431,8 @@ def convert_notebook_to_html(notebook_file: str, html_file: str) -> str: f.close() duration = time.time() - t0 - OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration) + OpUtil.log_operation_info( + f"{notebook_file} converted to {html_file}", duration) return html_file @staticmethod @@ -476,7 +493,7 @@ def execute(self) -> None: try: OpUtil.log_operation_info( - f"executing python script using 'python3 {python_script}' to '{python_script_output}'" + f"executing python script using 'python3 {python_script}'" ) t0 = time.time() @@ -484,20 +501,30 @@ def execute(self) -> None: if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(python_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------Python logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------Python logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 - OpUtil.log_operation_info("python script execution completed", duration) + OpUtil.log_operation_info( + "python script execution completed", duration) - self.put_file_to_object_storage(python_script_output, python_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(python_script_output, python_script_output) raise ex @@ -511,27 +538,37 @@ def execute(self) -> None: r_script_output = f"{r_script_name}.log" try: - OpUtil.log_operation_info(f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") + OpUtil.log_operation_info( + f"executing R script using 'Rscript {r_script}' to '{r_script_output}'") t0 = time.time() run_args = ["Rscript", r_script] if self.parameter_pass_method == "env": self.set_parameters_in_env() - with open(r_script_output, "w") as log_file: - subprocess.run(run_args, stdout=log_file, stderr=subprocess.STDOUT, check=True) + logger.info( + "----------------------R Script logs start----------------------") + process = subprocess.Popen( + run_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in iter(process.stdout.readline, b''): + sys.stdout.write(line.decode()) + + process.stdout.close() + return_code = process.wait() + logger.info( + "----------------------R Script logs ends----------------------") + if return_code: + raise subprocess.CalledProcessError(return_code, run_args) duration = time.time() - t0 OpUtil.log_operation_info("R script execution completed", duration) - self.put_file_to_object_storage(r_script_output, r_script_output) self.process_outputs() except Exception as ex: # log in case of errors logger.error(f"Unexpected error: {sys.exc_info()[0]}") logger.error(f"Error details: {ex}") - self.put_file_to_object_storage(r_script_output, r_script_output) raise ex @@ -566,7 +603,8 @@ def package_install(cls, user_volume_path) -> None: ) continue if version.Version(ver) > version.Version(current_packages[package]): - logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...") + logger.info( + f"Updating {package} package from version {current_packages[package]} to {ver}...") to_install_list.append(f"{package}=={ver}") elif version.Version(ver) < version.Version(current_packages[package]): logger.info( @@ -574,7 +612,8 @@ def package_install(cls, user_volume_path) -> None: f"already installed. Skipping..." ) else: - logger.info(f"Package not found. Installing {package} package with version {ver}...") + logger.info( + f"Package not found. Installing {package} package with version {ver}...") to_install_list.append(f"{package}=={ver}") if to_install_list: @@ -582,7 +621,8 @@ def package_install(cls, user_volume_path) -> None: to_install_list.insert(0, f"--target={user_volume_path}") to_install_list.append("--no-cache-dir") - subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True) + subprocess.run([sys.executable, "-m", "pip", + "install"] + to_install_list, check=True) if user_volume_path: os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf" @@ -609,13 +649,17 @@ def package_list_to_dict(cls, filename: str) -> dict: for line in fh: if line[0] != "#": if " @ " in line: - package_name, package_version = line.strip("\n").split(sep=" @ ") + package_name, package_version = line.strip( + "\n").split(sep=" @ ") elif "===" in line: - package_name, package_version = line.strip("\n").split(sep="===") + package_name, package_version = line.strip( + "\n").split(sep="===") elif "==" in line: - package_name, package_version = line.strip("\n").split(sep="==") + package_name, package_version = line.strip( + "\n").split(sep="==") elif line.startswith("-e ") or line.startswith("--editable "): - package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "") + package_name = line.strip("\n").replace( + "-e ", "").replace("--editable ", "") if "#egg=" in package_name: # editable package from version control system package_name = package_name.split("=")[-1] elif "/" in package_name: # editable package from local directory @@ -657,9 +701,12 @@ def parse_arguments(cls, args) -> dict: help="Archive containing notebook and dependency artifacts", required=True, ) - parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) - parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) - parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) + parser.add_argument("-f", "--file", dest="filepath", + help="File to execute", required=True) + parser.add_argument("-o", "--outputs", dest="outputs", + help="Files to output to object store", required=False) + parser.add_argument("-i", "--inputs", dest="inputs", + help="Files to pull in from parent node", required=False) parser.add_argument( "-p", "--user-volume-path", @@ -694,7 +741,8 @@ def parse_arguments(cls, args) -> dict: # set pipeline name as global pipeline_name = parsed_args.get("pipeline-name") # operation/node name is the basename of the non-suffixed filepath, set as global - operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0]) + operation_name = os.path.basename( + os.path.splitext(parsed_args.get("filepath"))[0]) return parsed_args @@ -715,7 +763,8 @@ def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = global pipeline_name, operation_name if enable_pipeline_info: duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else "" - logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") + logger.info( + f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}") def main(): @@ -743,4 +792,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main()