Skip to content

Commit

Permalink
Add option to append the run ID to Cloud Object Storage output paths
Browse files Browse the repository at this point in the history
  • Loading branch information
caponetto committed Jan 24, 2025
1 parent a74d345 commit 9c4c8eb
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 9 deletions.
14 changes: 14 additions & 0 deletions elyra/airflow/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, **kwargs: Any) -> None:
self.input_params = kwargs or []
self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint"))
self.cos_bucket = self.input_params.get("cos-bucket")
self.append_run_id = self.input_params.get("cos-output-append-run-id")

# Infer secure from the endpoint's scheme.
self.secure = self.cos_endpoint.scheme == "https"
Expand Down Expand Up @@ -184,6 +185,10 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[
if not object_to_upload:
object_to_upload = file_to_upload

run_id = os.getenv("ELYRA_RUN_NAME")
if self.append_run_id and run_id:
object_to_upload = os.path.join(run_id, object_to_upload)

object_to_upload = self.get_object_storage_filename(object_to_upload)
t0 = time.time()
self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload)
Expand Down Expand Up @@ -543,6 +548,15 @@ def parse_arguments(cls, args) -> dict:
help="Pipeline name",
required=True,
)
parser.add_argument(
"-a",
"--cos-output-append-run-id",
dest="cos-output-append-run-id",
type=bool,
help="Append run ID to the cloud object storage output path",
required=False,
action=argparse.BooleanOptionalAction,
)
parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True)
parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False)
parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False)
Expand Down
4 changes: 4 additions & 0 deletions elyra/airflow/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
cos_endpoint: str,
cos_bucket: str,
cos_directory: str,
cos_output_append_run_id: bool,
cos_dependencies_archive: str,
inputs: Optional[List[str]] = None,
outputs: Optional[List[str]] = None,
Expand All @@ -65,6 +66,7 @@ def __init__(
:param :cos_endpoint: object storage endpoint e.g weaikish1.fyre.ibm.com:30442
:param :cos_bucket: bucket to retrieve archive from
:param :cos_directory: name of the directory in the object storage bucket to pull
:param :cos_output_append_run_id: whether to append the run id to the output file path
:param :cos_dependencies_archive: archive file name to get from object storage bucket e.g archive1.tar.gz
:param inputs: comma delimited list of files to be consumed/are required by the filename
:param outputs: comma delimited list of files produced by the filename
Expand All @@ -73,6 +75,7 @@ def __init__(
self.cos_endpoint = cos_endpoint
self.cos_bucket = cos_bucket
self.cos_directory = cos_directory
self.cos_output_append_run_id = cos_output_append_run_id
self.cos_dependencies_archive = cos_dependencies_archive
self.filename = filename
self.pipeline_name = pipeline_name
Expand Down Expand Up @@ -103,6 +106,7 @@ def container_cmd(self):
f"--cos-endpoint {self.cos_endpoint} "
f"--cos-bucket {self.cos_bucket} "
f"--cos-directory '{self.cos_directory}' "
f"{'--cos-output-append-run-id ' if self.cos_output_append_run_id else ''}"
f"--cos-dependencies-archive '{self.cos_dependencies_archive}' "
f"--file '{self.filename}' "
]
Expand Down
14 changes: 14 additions & 0 deletions elyra/kfp/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(self, **kwargs: Any) -> None:
self.input_params = kwargs or {}
self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint"))
self.cos_bucket = self.input_params.get("cos-bucket")
self.append_run_id = self.input_params.get("cos-output-append-run-id")

self.parameter_pass_method = self.input_params.get("parameter_pass_method")
self.pipeline_param_dict = self.convert_param_str_to_dict(self.input_params.get("pipeline_parameters"))
Expand Down Expand Up @@ -313,6 +314,10 @@ def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[
if not object_to_upload:
object_to_upload = file_to_upload

run_id = os.getenv("ELYRA_RUN_NAME")
if self.append_run_id and run_id:
object_to_upload = os.path.join(run_id, object_to_upload)

object_to_upload = self.get_object_storage_filename(object_to_upload)
t0 = time.time()
self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload)
Expand Down Expand Up @@ -715,6 +720,15 @@ def parse_arguments(cls, args) -> dict:
help="Pipeline name",
required=True,
)
parser.add_argument(
"-a",
"--cos-output-append-run-id",
dest="cos-output-append-run-id",
type=bool,
help="Append run ID to the cloud object storage output path",
required=False,
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"-r",
"--pipeline-parameters",
Expand Down
2 changes: 2 additions & 0 deletions elyra/pipeline/airflow/airflow_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
artifact_object_prefix = join_paths(
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)
cos_output_append_run_id = pipeline.pipeline_properties.get(pipeline_constants.COS_OUTPUT_APPEND_RUN_ID, False)

self.log_pipeline_info(
pipeline_name,
Expand Down Expand Up @@ -327,6 +328,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=artifact_object_prefix,
cos_output_append_run_id=cos_output_append_run_id,
cos_dependencies_archive=operation_artifact_archive,
inputs=operation.inputs,
outputs=operation.outputs,
Expand Down
6 changes: 6 additions & 0 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,9 @@ def _generate_workflow_tasks(
artifact_object_prefix = join_paths(
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)
cos_output_append_run_id = pipeline.pipeline_properties.get(
pipeline_constants.COS_OUTPUT_APPEND_RUN_ID, False
)
# - load the generic component definition template
template_env = Environment(loader=PackageLoader("elyra", "templates/kubeflow/v2"))
generic_component_template = template_env.get_template("generic_component_definition_template.jinja2")
Expand Down Expand Up @@ -781,6 +784,7 @@ def _generate_workflow_tasks(
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=artifact_object_prefix,
cos_output_append_run_id=cos_output_append_run_id,
cos_dependencies_archive=self._get_dependency_archive_name(operation),
filename=operation.filename,
cos_inputs=operation.inputs,
Expand Down Expand Up @@ -1046,6 +1050,7 @@ def _compose_container_command_args(
cos_endpoint: str,
cos_bucket: str,
cos_directory: str,
cos_output_append_run_id: bool,
cos_dependencies_archive: str,
filename: str,
cos_inputs: Optional[List[str]] = [],
Expand Down Expand Up @@ -1119,6 +1124,7 @@ def _compose_container_command_args(
f"--cos-endpoint '{cos_endpoint}' "
f"--cos-bucket '{cos_bucket}' "
f"--cos-directory '{cos_directory}' "
f"{'--cos-output-append-run-id ' if cos_output_append_run_id else ''}"
f"--cos-dependencies-archive '{cos_dependencies_archive}' "
f"--file '{filename}' "
]
Expand Down
1 change: 1 addition & 0 deletions elyra/pipeline/pipeline_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@
DISABLE_NODE_CACHING = "disable_node_caching"
KUBERNETES_SHARED_MEM_SIZE = "kubernetes_shared_mem_size"
COS_OBJECT_PREFIX = "cos_object_prefix" # optional static prefix to be used when generating object name for cos storage
COS_OUTPUT_APPEND_RUN_ID = "cos_output_append_run_id" # optional flag to append run id to cos output path
9 changes: 9 additions & 0 deletions elyra/templates/pipeline/pipeline_properties_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
"ui:placeholder": "project/subproject"
}
},
"cos_output_append_run_id": {
"title": "Append Run ID to Output Path",
"type": "boolean",
"description": "If enabled, the Run ID will be appended to the output path in Object Storage.",
"uihints": {
"ui:widget": "checkbox",
"ui:help": "Useful for distinguishing outputs from different runs."
}
},
{% if elyra_owned_properties %}
"node_defaults_header": {
"type": "null",
Expand Down
25 changes: 25 additions & 0 deletions elyra/tests/airflow/test_airflow_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def test_fail_with_empty_string_as_filename():
cos_endpoint="http://testserver:32525",
cos_bucket="test_bucket",
cos_directory="test_directory",
cos_output_append_run_id=False,
cos_dependencies_archive="test_archive.tgz",
)
assert "You need to provide a filename for the operation." == str(error_info.value)
Expand All @@ -91,6 +92,7 @@ def test_build_cmd_with_inputs_and_outputs():
cos_endpoint="http://testserver:32525",
cos_bucket="test_bucket",
cos_directory="test_directory",
cos_output_append_run_id=False,
cos_dependencies_archive="test_archive.tgz",
inputs=pipeline_inputs,
outputs=pipeline_outputs,
Expand All @@ -106,3 +108,26 @@ def test_build_cmd_with_inputs_and_outputs():
assert arg_value == f"'{';'.join(pipeline_outputs)}'"
if "inputs" in arg:
assert arg_value == f"'{';'.join(pipeline_inputs)}'"


@pytest.mark.parametrize(
"cos_output_append_run_id, expected_in_cmd",
[
(False, False),
(True, True),
],
)
def test_cos_output_append_run_id(cos_output_append_run_id, expected_in_cmd):
boot_build = BootscriptBuilder(
filename="test_notebook.ipynb",
pipeline_name="test-pipeline",
cos_endpoint="http://testserver:32525",
cos_bucket="test_bucket",
cos_directory="test_directory",
cos_output_append_run_id=cos_output_append_run_id,
cos_dependencies_archive="test_archive.tgz",
)
if expected_in_cmd:
assert "--cos-output-append-run-id" in boot_build.container_cmd
else:
assert "--cos-output-append-run-id" not in boot_build.container_cmd
27 changes: 20 additions & 7 deletions elyra/tests/kfp/test_bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

ELYRA_ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
RESOURCES_DIR = os.path.join(ELYRA_ROOT_DIR, "elyra", "tests", "kfp", "resources")
ELYRA_RUN_ID_TEST = "9d8f5715-2e7c-4e64-8e34-35f510c12e66"


@pytest.fixture(scope="module", autouse=True)
Expand Down Expand Up @@ -114,6 +115,7 @@ def main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict):
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "minioadmin")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "minioadmin")
monkeypatch.setenv("TEST_ENV_VAR1", "test_env_var1")
monkeypatch.setenv("ELYRA_RUN_NAME", ELYRA_RUN_ID_TEST)

s3_setup.fput_object(
bucket_name=argument_dict["cos-bucket"],
Expand All @@ -137,22 +139,27 @@ def main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict):
"test-archive.tgz",
"test-file.txt",
"test,file.txt",
]
test_output_file_list = [
"test-file/test-file-copy.txt",
"test-file/test,file/test,file-copy.txt",
"test-notebookA.ipynb",
"test-notebookA-output.ipynb",
"test-notebookA.html",
"test-notebookA-output.ipynb",
]
# Ensure working directory has all the files.
for file in test_file_list:
for file in test_file_list + test_output_file_list:
assert os.path.isfile(file)
for file in test_file_list:
assert s3_setup.stat_object(bucket_name=argument_dict["cos-bucket"], object_name="test-directory/" + file)
# Ensure upload directory has all the files EXCEPT the output notebook
# since it was it is uploaded as the input notebook (test-notebookA.ipynb)
# (which is included in the archive at start).
for file in test_file_list:
for file in test_output_file_list:
if file != "test-notebookA-output.ipynb":
run_id_prefix = f"{ELYRA_RUN_ID_TEST}/" if argument_dict.get("cos-output-append-run-id", False) else ""
assert s3_setup.stat_object(
bucket_name=argument_dict["cos-bucket"], object_name="test-directory/" + file
bucket_name=argument_dict["cos-bucket"], object_name=f"test-directory/{run_id_prefix}{file}"
)
if file == "test-notebookA.html":
with open("test-notebookA.html") as html_file:
Expand All @@ -178,11 +185,13 @@ def _get_operation_instance(monkeypatch, s3_setup):
return op


def test_main_method(monkeypatch, s3_setup, tmpdir):
@pytest.mark.parametrize("cos_output_append_run_id", [False, True])
def test_main_method(monkeypatch, s3_setup, tmpdir, cos_output_append_run_id):
argument_dict = {
"cos-endpoint": "http://" + MINIO_HOST_PORT,
"cos-bucket": "test-bucket",
"cos-directory": "test-directory",
"cos-output-append-run-id": cos_output_append_run_id,
"cos-dependencies-archive": "test-archive.tgz",
"filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"),
"inputs": "test-file.txt;test,file.txt",
Expand All @@ -192,11 +201,13 @@ def test_main_method(monkeypatch, s3_setup, tmpdir):
main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict)


def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir):
@pytest.mark.parametrize("cos_output_append_run_id", [False, True])
def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir, cos_output_append_run_id):
argument_dict = {
"cos-endpoint": "http://" + MINIO_HOST_PORT,
"cos-bucket": "test-bucket",
"cos-directory": "test-directory",
"cos-output-append-run-id": cos_output_append_run_id,
"cos-dependencies-archive": "test-archive.tgz",
"filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"),
"inputs": "test-file.txt;test,file.txt",
Expand All @@ -206,11 +217,13 @@ def test_main_method_with_wildcard_outputs(monkeypatch, s3_setup, tmpdir):
main_method_setup_execution(monkeypatch, s3_setup, tmpdir, argument_dict)


def test_main_method_with_dir_outputs(monkeypatch, s3_setup, tmpdir):
@pytest.mark.parametrize("cos_output_append_run_id", [False, True])
def test_main_method_with_dir_outputs(monkeypatch, s3_setup, tmpdir, cos_output_append_run_id):
argument_dict = {
"cos-endpoint": "http://" + MINIO_HOST_PORT,
"cos-bucket": "test-bucket",
"cos-directory": "test-directory",
"cos-output-append-run-id": cos_output_append_run_id,
"cos-dependencies-archive": "test-archive.tgz",
"filepath": os.path.join(RESOURCES_DIR, "test-notebookA.ipynb"),
"inputs": "test-file.txt;test,file.txt",
Expand Down
7 changes: 7 additions & 0 deletions elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from elyra.pipeline.parser import PipelineParser
from elyra.pipeline.pipeline import GenericOperation
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import COS_OUTPUT_APPEND_RUN_ID
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.properties import ElyraProperty
from elyra.pipeline.runtime_type import RuntimeProcessorType
Expand Down Expand Up @@ -178,6 +179,12 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
cos_prefix = pipeline_json["pipelines"][0]["app_data"]["properties"]["pipeline_defaults"].get(COS_OBJECT_PREFIX)
assert cos_prefix == parsed_pipeline.pipeline_properties.get(COS_OBJECT_PREFIX)

# Ensure the value of COS_OUTPUT_APPEND_RUN_ID has been propagated to the Pipeline object appropriately
cos_output_append_run_id = pipeline_json["pipelines"][0]["app_data"]["properties"]["pipeline_defaults"].get(
COS_OUTPUT_APPEND_RUN_ID
)
assert cos_output_append_run_id == parsed_pipeline.pipeline_properties.get(COS_OUTPUT_APPEND_RUN_ID)

with tempfile.TemporaryDirectory() as temp_dir:
export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py")

Expand Down
5 changes: 5 additions & 0 deletions elyra/tests/pipeline/kfp/test_processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor):
cos_endpoint = "https://minio:9000"
cos_bucket = "test_bucket"
cos_directory = "a_dir"
cos_output_append_run_id = True
cos_dependencies_archive = "dummy-notebook-0815.tar.gz"
filename = "dummy-notebook.ipynb"

Expand All @@ -126,6 +127,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor):
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=cos_directory,
cos_output_append_run_id=cos_output_append_run_id,
cos_dependencies_archive=cos_dependencies_archive,
filename=filename,
)
Expand All @@ -134,6 +136,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor):
assert f"--cos-endpoint '{cos_endpoint}'" in command_args
assert f"--cos-bucket '{cos_bucket}'" in command_args
assert f"--cos-directory '{cos_directory}'" in command_args
assert "--cos-output-append-run-id" in command_args
assert f"--cos-dependencies-archive '{cos_dependencies_archive}'" in command_args
assert f"--file '{filename}'" in command_args

Expand All @@ -148,6 +151,7 @@ def test_compose_container_command_args(processor: KfpPipelineProcessor):
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=cos_directory,
cos_output_append_run_id=True,
cos_dependencies_archive=cos_dependencies_archive,
filename=filename,
cos_inputs=file_dependency,
Expand Down Expand Up @@ -199,6 +203,7 @@ def test_compose_container_command_args_invalid_dependency_filename(processor: K
cos_endpoint=cos_endpoint,
cos_bucket=cos_bucket,
cos_directory=cos_directory,
cos_output_append_run_id=True,
cos_dependencies_archive=cos_dependencies_archive,
filename=filename,
cos_inputs=file_dependency,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,8 @@
},
"properties": {
"pipeline_defaults": {
"cos_object_prefix": "test/prefix"
"cos_object_prefix": "test/prefix",
"cos_output_append_run_id": "true"
}
},
"version": 5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@
"kubernetes_secrets": [],
"env_vars": [],
"runtime_image": "tensorflow/tensorflow:2.8.0",
"cos_object_prefix": "my/project"
"cos_object_prefix": "my/project",
"cos_output_append_run_id": "true"
},
"name": "kfp-one-node-generic",
"runtime": "Kubeflow Pipelines",
Expand Down
Loading

0 comments on commit 9c4c8eb

Please sign in to comment.