diff --git a/elyra/airflow/bootstrapper.py b/elyra/airflow/bootstrapper.py index 4071de73c..f650cc31f 100644 --- a/elyra/airflow/bootstrapper.py +++ b/elyra/airflow/bootstrapper.py @@ -78,6 +78,7 @@ def __init__(self, **kwargs: Any) -> None: self.input_params = kwargs or [] self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") + self.append_run_id = self.input_params.get("cos-output-append-run-id") # Infer secure from the endpoint's scheme. self.secure = self.cos_endpoint.scheme == "https" @@ -184,6 +185,10 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ if not object_to_upload: object_to_upload = file_to_upload + run_id = os.getenv("ELYRA_RUN_NAME") + if self.append_run_id and run_id: + object_to_upload = os.path.join(run_id, object_to_upload) + object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) @@ -543,6 +548,15 @@ def parse_arguments(cls, args) -> dict: help="Pipeline name", required=True, ) + parser.add_argument( + "-a", + "--cos-output-append-run-id", + dest="cos-output-append-run-id", + type=bool, + help="Append run ID to the cloud object storage output path", + required=False, + action=argparse.BooleanOptionalAction, + ) parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True) parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False) parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False) diff --git a/elyra/airflow/operator.py b/elyra/airflow/operator.py index 678de0988..b6f501a11 100644 --- a/elyra/airflow/operator.py +++ b/elyra/airflow/operator.py @@ -53,6 +53,7 @@ def __init__( cos_endpoint: str, cos_bucket: str, cos_directory: str, + cos_output_append_run_id: bool, cos_dependencies_archive: str, inputs: Optional[List[str]] = None, outputs: Optional[List[str]] = None, @@ -65,6 +66,7 @@ def __init__( :param :cos_endpoint: object storage endpoint e.g weaikish1.fyre.ibm.com:30442 :param :cos_bucket: bucket to retrieve archive from :param :cos_directory: name of the directory in the object storage bucket to pull + :param :cos_output_append_run_id: whether to append the run id to the output file path :param :cos_dependencies_archive: archive file name to get from object storage bucket e.g archive1.tar.gz :param inputs: comma delimited list of files to be consumed/are required by the filename :param outputs: comma delimited list of files produced by the filename @@ -73,6 +75,7 @@ def __init__( self.cos_endpoint = cos_endpoint self.cos_bucket = cos_bucket self.cos_directory = cos_directory + self.cos_output_append_run_id = cos_output_append_run_id self.cos_dependencies_archive = cos_dependencies_archive self.filename = filename self.pipeline_name = pipeline_name @@ -103,6 +106,7 @@ def container_cmd(self): f"--cos-endpoint {self.cos_endpoint} " f"--cos-bucket {self.cos_bucket} " f"--cos-directory '{self.cos_directory}' " + f"{'--cos-output-append-run-id ' if self.cos_output_append_run_id else ''}" f"--cos-dependencies-archive '{self.cos_dependencies_archive}' " f"--file '{self.filename}' " ] diff --git a/elyra/kfp/bootstrapper.py b/elyra/kfp/bootstrapper.py index 1c5baec09..6be45e47d 100644 --- a/elyra/kfp/bootstrapper.py +++ b/elyra/kfp/bootstrapper.py @@ -84,6 +84,7 @@ def __init__(self, **kwargs: Any) -> None: self.input_params = kwargs or {} self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint")) self.cos_bucket = self.input_params.get("cos-bucket") + self.append_run_id = self.input_params.get("cos-output-append-run-id") self.parameter_pass_method = self.input_params.get("parameter_pass_method") self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters")) @@ -313,6 +314,10 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[ if not object_to_upload: object_to_upload = file_to_upload + run_id = os.getenv("ELYRA_RUN_NAME") + if self.append_run_id and run_id: + object_to_upload = os.path.join(run_id, object_to_upload) + object_to_upload = self.get_object_storage_filename(object_to_upload) t0 = time.time() self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload) @@ -715,6 +720,15 @@ def parse_arguments(cls, args) -> dict: help="Pipeline name", required=True, ) + parser.add_argument( + "-a", + "--cos-output-append-run-id", + dest="cos-output-append-run-id", + type=bool, + help="Append run ID to the cloud object storage output path", + required=False, + action=argparse.BooleanOptionalAction, + ) parser.add_argument( "-r", "--pipeline-parameters", diff --git a/elyra/pipeline/airflow/airflow_processor.py b/elyra/pipeline/airflow/airflow_processor.py index 82ab8a0b2..1ca94fdf6 100644 --- a/elyra/pipeline/airflow/airflow_processor.py +++ b/elyra/pipeline/airflow/airflow_processor.py @@ -255,6 +255,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance artifact_object_prefix = join_paths( pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id ) + cos_output_append_run_id = pipeline.pipeline_properties.get(pipeline_constants.COS_OUTPUT_APPEND_RUN_ID, False) self.log_pipeline_info( pipeline_name, @@ -327,6 +328,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance cos_endpoint=cos_endpoint, cos_bucket=cos_bucket, cos_directory=artifact_object_prefix, + cos_output_append_run_id=cos_output_append_run_id, cos_dependencies_archive=operation_artifact_archive, inputs=operation.inputs, outputs=operation.outputs, diff --git a/elyra/pipeline/kfp/processor_kfp.py b/elyra/pipeline/kfp/processor_kfp.py index f83a5e889..1f16e2726 100644 --- a/elyra/pipeline/kfp/processor_kfp.py +++ b/elyra/pipeline/kfp/processor_kfp.py @@ -708,6 +708,9 @@ def _generate_workflow_tasks( artifact_object_prefix = join_paths( pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id ) + cos_output_append_run_id = pipeline.pipeline_properties.get( + pipeline_constants.COS_OUTPUT_APPEND_RUN_ID, False + ) # - load the generic component definition template template_env = Environment(loader=PackageLoader("elyra", "templates/kubeflow/v2")) generic_component_template = template_env.get_template("generic_component_definition_template.jinja2") @@ -781,6 +784,7 @@ def _generate_workflow_tasks( cos_endpoint=cos_endpoint, cos_bucket=cos_bucket, cos_directory=artifact_object_prefix, + cos_output_append_run_id=cos_output_append_run_id, cos_dependencies_archive=self._get_dependency_archive_name(operation), filename=operation.filename, cos_inputs=operation.inputs, @@ -1046,6 +1050,7 @@ def _compose_container_command_args( cos_endpoint: str, cos_bucket: str, cos_directory: str, + cos_output_append_run_id: bool, cos_dependencies_archive: str, filename: str, cos_inputs: Optional[List[str]] = [], @@ -1119,6 +1124,7 @@ def _compose_container_command_args( f"--cos-endpoint '{cos_endpoint}' " f"--cos-bucket '{cos_bucket}' " f"--cos-directory '{cos_directory}' " + f"{'--cos-output-append-run-id ' if cos_output_append_run_id else ''}" f"--cos-dependencies-archive '{cos_dependencies_archive}' " f"--file '{filename}' " ] diff --git a/elyra/pipeline/pipeline_constants.py b/elyra/pipeline/pipeline_constants.py index e80f5dbe1..5828880bb 100644 --- a/elyra/pipeline/pipeline_constants.py +++ b/elyra/pipeline/pipeline_constants.py @@ -26,3 +26,4 @@ DISABLE_NODE_CACHING = "disable_node_caching" KUBERNETES_SHARED_MEM_SIZE = "kubernetes_shared_mem_size" COS_OBJECT_PREFIX = "cos_object_prefix" # optional static prefix to be used when generating object name for cos storage +COS_OUTPUT_APPEND_RUN_ID = "cos_output_append_run_id" # optional flag to append run id to cos output path diff --git a/elyra/templates/pipeline/pipeline_properties_template.jinja2 b/elyra/templates/pipeline/pipeline_properties_template.jinja2 index 43a4767d0..cbc41a246 100644 --- a/elyra/templates/pipeline/pipeline_properties_template.jinja2 +++ b/elyra/templates/pipeline/pipeline_properties_template.jinja2 @@ -35,6 +35,15 @@ "ui:placeholder": "project/subproject" } }, + "cos_output_append_run_id": { + "title": "Append Run ID to Output Path", + "type": "boolean", + "description": "If enabled, the Run ID will be appended to the output path in Object Storage.", + "uihints": { + "ui:widget": "checkbox", + "ui:help": "Useful for distinguishing outputs from different runs." + } + }, {% if elyra_owned_properties %} "node_defaults_header": { "type": "null", diff --git a/elyra/tests/airflow/test_airflow_operator.py b/elyra/tests/airflow/test_airflow_operator.py index 967d23ce1..2926312a3 100644 --- a/elyra/tests/airflow/test_airflow_operator.py +++ b/elyra/tests/airflow/test_airflow_operator.py @@ -76,6 +76,7 @@ def test_fail_with_empty_string_as_filename(): cos_endpoint="http://testserver:32525", cos_bucket="test_bucket", cos_directory="test_directory", + cos_output_append_run_id=False, cos_dependencies_archive="test_archive.tgz", ) assert "You need to provide a filename for the operation." == str(error_info.value) @@ -91,6 +92,7 @@ def test_build_cmd_with_inputs_and_outputs(): cos_endpoint="http://testserver:32525", cos_bucket="test_bucket", cos_directory="test_directory", + cos_output_append_run_id=False, cos_dependencies_archive="test_archive.tgz", inputs=pipeline_inputs, outputs=pipeline_outputs, @@ -106,3 +108,26 @@ def test_build_cmd_with_inputs_and_outputs(): assert arg_value == f"'{';'.join(pipeline_outputs)}'" if "inputs" in arg: assert arg_value == f"'{';'.join(pipeline_inputs)}'" + + +@pytest.mark.parametrize( + "cos_output_append_run_id, expected_in_cmd", + [ + (False, False), + (True, True), + ], +) +def test_cos_output_append_run_id(cos_output_append_run_id, expected_in_cmd): + boot_build = BootscriptBuilder( + filename="test_notebook.ipynb", + pipeline_name="test-pipeline", + cos_endpoint="http://testserver:32525", + cos_bucket="test_bucket", + cos_directory="test_directory", + cos_output_append_run_id=cos_output_append_run_id, + cos_dependencies_archive="test_archive.tgz", + ) + if expected_in_cmd: + assert "--cos-output-append-run-id" in boot_build.container_cmd + else: + assert "--cos-output-append-run-id" not in boot_build.container_cmd diff --git a/elyra/tests/kfp/test_bootstrapper.py b/elyra/tests/kfp/test_bootstrapper.py index 919c0f886..6559fc5c1 100644 --- a/elyra/tests/kfp/test_bootstrapper.py +++ b/elyra/tests/kfp/test_bootstrapper.py @@ -49,6 +49,7 @@ ELYRA_ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) RESOURCES_DIR = os.path.join(ELYRA_ROOT_DIR, "elyra", "tests", "kfp", "resources") +ELYRA_RUN_ID_TEST = "9d8f5715-2e7c-4e64-8e34-35f510c12e66" @pytest.fixture(scope="module", autouse=True) @@ -114,6 +115,7 @@ def main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict): monkeypatch.setenv("AWS_ACCESS_KEY_ID", "minioadmin") monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "minioadmin") monkeypatch.setenv("TEST_ENV_VAR1", "test_env_var1") + monkeypatch.setenv("ELYRA_RUN_NAME", ELYRA_RUN_ID_TEST) s3_setup.fput_object( bucket_name=argument_dict["cos-bucket"], @@ -137,22 +139,27 @@ def main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict): "test-archive.tgz", "test-file.txt", "test,file.txt", + ] + test_output_file_list = [ "test-file/test-file-copy.txt", "test-file/test,file/test,file-copy.txt", "test-notebookA.ipynb", - "test-notebookA-output.ipynb", "test-notebookA.html", + "test-notebookA-output.ipynb", ] # Ensure working directory has all the files. - for file in test_file_list: + for file in test_file_list + test_output_file_list: assert os.path.isfile(file) + for file in test_file_list: + assert s3_setup.stat_object(bucket_name=argument_dict["cos-bucket"], object_name="test-directory/" + file) # Ensure upload directory has all the files EXCEPT the output notebook # since it was it is uploaded as the input notebook (test-notebookA.ipynb) # (which is included in the archive at start). - for file in test_file_list: + for file in test_output_file_list: if file != "test-notebookA-output.ipynb": + run_id_prefix = f"{ELYRA_RUN_ID_TEST}/" if argument_dict.get("cos-output-append-run-id", False) else "" assert s3_setup.stat_object( - bucket_name=argument_dict["cos-bucket"], object_name="test-directory/" + file + bucket_name=argument_dict["cos-bucket"], object_name=f"test-directory/{run_id_prefix}{file}" ) if file == "test-notebookA.html": with open("test-notebookA.html") as html_file: @@ -178,11 +185,13 @@ def _get_operation_instance(monkeypatch, s3_setup): return op -def test_main_method(monkeypatch, s3_setup, tmpdir): +@pytest.mark.parametrize("cos_output_append_run_id", [False, True]) +def test_main_method(monkeypatch, s3_setup, tmpdir, cos_output_append_run_id): argument_dict = { "cos-endpoint": "http://" + MINIO_HOST_PORT, "cos-bucket": "test-bucket", "cos-directory": "test-directory", + "cos-output-append-run-id": cos_output_append_run_id, "cos-dependencies-archive": "test-archive.tgz", "filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"), "inputs": "test-file.txt;test,file.txt", @@ -192,11 +201,13 @@ def test_main_method(monkeypatch, s3_setup, tmpdir): main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict) -def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir): +@pytest.mark.parametrize("cos_output_append_run_id", [False, True]) +def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir, cos_output_append_run_id): argument_dict = { "cos-endpoint": "http://" + MINIO_HOST_PORT, "cos-bucket": "test-bucket", "cos-directory": "test-directory", + "cos-output-append-run-id": cos_output_append_run_id, "cos-dependencies-archive": "test-archive.tgz", "filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"), "inputs": "test-file.txt;test,file.txt", @@ -206,11 +217,13 @@ def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir): main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict) -def test_main_method_with_dir_outputs(monkeypatch, s3_setup, tmpdir): +@pytest.mark.parametrize("cos_output_append_run_id", [False, True]) +def test_main_method_with_dir_outputs(monkeypatch, s3_setup, tmpdir, cos_output_append_run_id): argument_dict = { "cos-endpoint": "http://" + MINIO_HOST_PORT, "cos-bucket": "test-bucket", "cos-directory": "test-directory", + "cos-output-append-run-id": cos_output_append_run_id, "cos-dependencies-archive": "test-archive.tgz", "filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"), "inputs": "test-file.txt;test,file.txt", diff --git a/elyra/tests/pipeline/airflow/test_processor_airflow.py b/elyra/tests/pipeline/airflow/test_processor_airflow.py index e2fddc8c5..02994b18d 100644 --- a/elyra/tests/pipeline/airflow/test_processor_airflow.py +++ b/elyra/tests/pipeline/airflow/test_processor_airflow.py @@ -31,6 +31,7 @@ from elyra.pipeline.parser import PipelineParser from elyra.pipeline.pipeline import GenericOperation from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX +from elyra.pipeline.pipeline_constants import COS_OUTPUT_APPEND_RUN_ID from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES from elyra.pipeline.properties import ElyraProperty from elyra.pipeline.runtime_type import RuntimeProcessorType @@ -178,6 +179,12 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic cos_prefix = pipeline_json["pipelines"][0]["app_data"]["properties"]["pipeline_defaults"].get(COS_OBJECT_PREFIX) assert cos_prefix == parsed_pipeline.pipeline_properties.get(COS_OBJECT_PREFIX) + # Ensure the value of COS_OUTPUT_APPEND_RUN_ID has been propagated to the Pipeline object appropriately + cos_output_append_run_id = pipeline_json["pipelines"][0]["app_data"]["properties"]["pipeline_defaults"].get( + COS_OUTPUT_APPEND_RUN_ID + ) + assert cos_output_append_run_id == parsed_pipeline.pipeline_properties.get(COS_OUTPUT_APPEND_RUN_ID) + with tempfile.TemporaryDirectory() as temp_dir: export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py") diff --git a/elyra/tests/pipeline/kfp/test_processor_kfp.py b/elyra/tests/pipeline/kfp/test_processor_kfp.py index 0cd133327..a8a6bd93c 100644 --- a/elyra/tests/pipeline/kfp/test_processor_kfp.py +++ b/elyra/tests/pipeline/kfp/test_processor_kfp.py @@ -118,6 +118,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor): cos_endpoint = "https://minio:9000" cos_bucket = "test_bucket" cos_directory = "a_dir" + cos_output_append_run_id = True cos_dependencies_archive = "dummy-notebook-0815.tar.gz" filename = "dummy-notebook.ipynb" @@ -126,6 +127,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor): cos_endpoint=cos_endpoint, cos_bucket=cos_bucket, cos_directory=cos_directory, + cos_output_append_run_id=cos_output_append_run_id, cos_dependencies_archive=cos_dependencies_archive, filename=filename, ) @@ -134,6 +136,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor): assert f"--cos-endpoint '{cos_endpoint}'" in command_args assert f"--cos-bucket '{cos_bucket}'" in command_args assert f"--cos-directory '{cos_directory}'" in command_args + assert "--cos-output-append-run-id" in command_args assert f"--cos-dependencies-archive '{cos_dependencies_archive}'" in command_args assert f"--file '{filename}'" in command_args @@ -148,6 +151,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor): cos_endpoint=cos_endpoint, cos_bucket=cos_bucket, cos_directory=cos_directory, + cos_output_append_run_id=True, cos_dependencies_archive=cos_dependencies_archive, filename=filename, cos_inputs=file_dependency, @@ -199,6 +203,7 @@ def test_compose_container_command_args_invalid_dependency_filename(processor: K cos_endpoint=cos_endpoint, cos_bucket=cos_bucket, cos_directory=cos_directory, + cos_output_append_run_id=True, cos_dependencies_archive=cos_dependencies_archive, filename=filename, cos_inputs=file_dependency, diff --git a/elyra/tests/pipeline/resources/sample_pipelines/pipeline_dependency_complex.json b/elyra/tests/pipeline/resources/sample_pipelines/pipeline_dependency_complex.json index b64219870..b4f49dad1 100644 --- a/elyra/tests/pipeline/resources/sample_pipelines/pipeline_dependency_complex.json +++ b/elyra/tests/pipeline/resources/sample_pipelines/pipeline_dependency_complex.json @@ -634,7 +634,8 @@ }, "properties": { "pipeline_defaults": { - "cos_object_prefix": "test/prefix" + "cos_object_prefix": "test/prefix", + "cos_output_append_run_id": "true" } }, "version": 5, diff --git a/elyra/tests/pipeline/resources/test_pipelines/kfp/kfp-one-node-generic.pipeline b/elyra/tests/pipeline/resources/test_pipelines/kfp/kfp-one-node-generic.pipeline index 83fb3d66e..1e85c51d2 100644 --- a/elyra/tests/pipeline/resources/test_pipelines/kfp/kfp-one-node-generic.pipeline +++ b/elyra/tests/pipeline/resources/test_pipelines/kfp/kfp-one-node-generic.pipeline @@ -96,7 +96,8 @@ "kubernetes_secrets": [], "env_vars": [], "runtime_image": "tensorflow/tensorflow:2.8.0", - "cos_object_prefix": "my/project" + "cos_object_prefix": "my/project", + "cos_output_append_run_id": "true" }, "name": "kfp-one-node-generic", "runtime": "Kubeflow Pipelines", diff --git a/elyra/tests/pipeline/test_handlers.py b/elyra/tests/pipeline/test_handlers.py index b937b86fa..aa7acb4a1 100644 --- a/elyra/tests/pipeline/test_handlers.py +++ b/elyra/tests/pipeline/test_handlers.py @@ -28,6 +28,7 @@ from elyra.pipeline.parser import PipelineParser from elyra.pipeline.pipeline_constants import ( COS_OBJECT_PREFIX, + COS_OUTPUT_APPEND_RUN_ID, DISABLE_NODE_CACHING, ENV_VARIABLES, KUBERNETES_POD_ANNOTATIONS, @@ -266,6 +267,7 @@ async def test_get_pipeline_properties_definition(jp_fetch): default_properties = [ COS_OBJECT_PREFIX, + COS_OUTPUT_APPEND_RUN_ID, RUNTIME_IMAGE, ENV_VARIABLES, KUBERNETES_SECRETS,