From 2fe6c5f5476245eeff269d2fb50990e0f89a0b85 Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Mon, 4 Nov 2024 12:14:45 +0530 Subject: [PATCH 1/8] Fix Ruff rule B027 --- .../portable/base_executor_operator.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/tfx/orchestration/portable/base_executor_operator.py b/tfx/orchestration/portable/base_executor_operator.py index 2a9f36a202..9c14839360 100644 --- a/tfx/orchestration/portable/base_executor_operator.py +++ b/tfx/orchestration/portable/base_executor_operator.py @@ -22,8 +22,7 @@ from google.protobuf import message - -class BaseExecutorOperator(abc.ABC): +class ParentBaseExecutorOperator(abc.ABC): """The base class of all executor operators.""" SUPPORTED_EXECUTOR_SPEC_TYPE = abc_utils.abstract_property() @@ -84,6 +83,28 @@ def with_execution_watcher( self._execution_watcher_address = execution_watcher_address return self + @abc.abstractmethod + def handle_stop(self) -> None: + """Executor Operator specific logic to clean up after it is stopped.""" + pass + +class BaseExecutorOperator(ParentBaseExecutorOperator): + """The child class for all abstract methods.""" + + def run_executor( + self, + execution_info: data_types.ExecutionInfo, + ) -> Optional[execution_result_pb2.ExecutorOutput]: + """Invokes the executor with inputs provided by the Launcher. + + Args: + execution_info: A wrapper of the info needed by this execution. + + Returns: + The output from executor. + """ + pass + def handle_stop(self) -> None: """Executor Operator specific logic to clean up after it is stopped.""" pass From 83abbc0c796b201e2225be4f4b78edc02fbaee69 Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Mon, 4 Nov 2024 14:08:40 +0530 Subject: [PATCH 2/8] Fix Ruff rule B027 --- tfx/tools/cli/handler/dag_runner_patcher.py | 22 ++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tfx/tools/cli/handler/dag_runner_patcher.py b/tfx/tools/cli/handler/dag_runner_patcher.py index 924c0799bf..a57ca3beed 100644 --- a/tfx/tools/cli/handler/dag_runner_patcher.py +++ b/tfx/tools/cli/handler/dag_runner_patcher.py @@ -24,7 +24,7 @@ from tfx.proto.orchestration import pipeline_pb2 -class DagRunnerPatcher(abc.ABC): +class ParentDagRunnerPatcher(abc.ABC): """Abstract base class for Patchers for various "DagRunner"s. These patcher classes "decorate" the `run` function of the DagRunners. @@ -56,11 +56,13 @@ def __init__(self, call_real_run=True): self._run_called = False self._call_real_run = call_real_run + @abc.abstractmethod def _before_run(self, runner: tfx_runner.TfxRunner, pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], context: MutableMapping[str, Any]) -> None: pass + @abc.abstractmethod def _after_run(self, runner: tfx_runner.TfxRunner, pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], context: MutableMapping[str, Any]) -> None: @@ -135,3 +137,21 @@ def wrapper(*args, **kwargs): return result return wrapper + +class DagRunnerPatcher(ParentDagRunnerPatcher): + """The child class for all abstract methods.""" + + def _before_run(self, runner: tfx_runner.TfxRunner, + pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], + context: MutableMapping[str, Any]) -> None: + pass + + def _after_run(self, runner: tfx_runner.TfxRunner, + pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], + context: MutableMapping[str, Any]) -> None: + pass + + def get_runner_class( + self + ) -> Union[Type[tfx_runner.TfxRunner], Type[portable_tfx_runner.TfxRunner]]: + raise NotImplementedError() From e438a8bc9c444c78f6aaa9a6299dca3cd03edfea Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Mon, 4 Nov 2024 18:57:37 +0530 Subject: [PATCH 3/8] Fix Ruff rule B024 --- tfx/types/system_artifacts.py | 2 +- tfx/types/system_executions.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tfx/types/system_artifacts.py b/tfx/types/system_artifacts.py index ce960ced4a..141efdbba1 100644 --- a/tfx/types/system_artifacts.py +++ b/tfx/types/system_artifacts.py @@ -30,7 +30,7 @@ class SystemArtifact(abc.ABC): The subclasses, e.g, Dataset, Model, Statistics, e.t.c, match the MLMD types from third_party/ml_metadata/metadata_store/mlmd_types.py. """ - + # noqa: B024 # MLMD system base type enum. Override it when creating subclasses. MLMD_SYSTEM_BASE_TYPE = None diff --git a/tfx/types/system_executions.py b/tfx/types/system_executions.py index 5ec827e181..c12943d113 100644 --- a/tfx/types/system_executions.py +++ b/tfx/types/system_executions.py @@ -30,7 +30,7 @@ class SystemExecution(abc.ABC): The subclasses, e.g, Train, Transform, Process, e.t.c, match the MLMD types from third_party/ml_metadata/metadata_store/mlmd_types.py. """ - + # noqa: B024 # MLMD system base type enum. Override it when creating subclasses. MLMD_SYSTEM_BASE_TYPE = None From e01bc0f2b90b3f7d8b8cdc3cc431a8e316833963 Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Tue, 5 Nov 2024 12:23:44 +0530 Subject: [PATCH 4/8] Fix Ruff rules --- .../portable/base_executor_operator.py | 29 +++---------------- tfx/tools/cli/handler/dag_runner_patcher.py | 26 ++--------------- tfx/types/system_artifacts.py | 3 +- tfx/types/system_executions.py | 3 +- 4 files changed, 9 insertions(+), 52 deletions(-) diff --git a/tfx/orchestration/portable/base_executor_operator.py b/tfx/orchestration/portable/base_executor_operator.py index 9c14839360..05b9b5cc7b 100644 --- a/tfx/orchestration/portable/base_executor_operator.py +++ b/tfx/orchestration/portable/base_executor_operator.py @@ -22,7 +22,8 @@ from google.protobuf import message -class ParentBaseExecutorOperator(abc.ABC): + +class BaseExecutorOperator(abc.ABC): """The base class of all executor operators.""" SUPPORTED_EXECUTOR_SPEC_TYPE = abc_utils.abstract_property() @@ -83,28 +84,6 @@ def with_execution_watcher( self._execution_watcher_address = execution_watcher_address return self - @abc.abstractmethod - def handle_stop(self) -> None: + def handle_stop(self) -> None:# noqa: B027 """Executor Operator specific logic to clean up after it is stopped.""" - pass - -class BaseExecutorOperator(ParentBaseExecutorOperator): - """The child class for all abstract methods.""" - - def run_executor( - self, - execution_info: data_types.ExecutionInfo, - ) -> Optional[execution_result_pb2.ExecutorOutput]: - """Invokes the executor with inputs provided by the Launcher. - - Args: - execution_info: A wrapper of the info needed by this execution. - - Returns: - The output from executor. - """ - pass - - def handle_stop(self) -> None: - """Executor Operator specific logic to clean up after it is stopped.""" - pass + pass \ No newline at end of file diff --git a/tfx/tools/cli/handler/dag_runner_patcher.py b/tfx/tools/cli/handler/dag_runner_patcher.py index a57ca3beed..fae34b3abf 100644 --- a/tfx/tools/cli/handler/dag_runner_patcher.py +++ b/tfx/tools/cli/handler/dag_runner_patcher.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Base class to patch DagRunner classes in TFX CLI.""" - +#ruff: noqa: B027 import abc import contextlib import functools @@ -24,7 +24,7 @@ from tfx.proto.orchestration import pipeline_pb2 -class ParentDagRunnerPatcher(abc.ABC): +class DagRunnerPatcher(abc.ABC): """Abstract base class for Patchers for various "DagRunner"s. These patcher classes "decorate" the `run` function of the DagRunners. @@ -56,13 +56,11 @@ def __init__(self, call_real_run=True): self._run_called = False self._call_real_run = call_real_run - @abc.abstractmethod def _before_run(self, runner: tfx_runner.TfxRunner, pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], context: MutableMapping[str, Any]) -> None: pass - @abc.abstractmethod def _after_run(self, runner: tfx_runner.TfxRunner, pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], context: MutableMapping[str, Any]) -> None: @@ -136,22 +134,4 @@ def wrapper(*args, **kwargs): self._after_run(runner, pipeline, self._context) return result - return wrapper - -class DagRunnerPatcher(ParentDagRunnerPatcher): - """The child class for all abstract methods.""" - - def _before_run(self, runner: tfx_runner.TfxRunner, - pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], - context: MutableMapping[str, Any]) -> None: - pass - - def _after_run(self, runner: tfx_runner.TfxRunner, - pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], - context: MutableMapping[str, Any]) -> None: - pass - - def get_runner_class( - self - ) -> Union[Type[tfx_runner.TfxRunner], Type[portable_tfx_runner.TfxRunner]]: - raise NotImplementedError() + return wrapper \ No newline at end of file diff --git a/tfx/types/system_artifacts.py b/tfx/types/system_artifacts.py index 141efdbba1..91b0e12783 100644 --- a/tfx/types/system_artifacts.py +++ b/tfx/types/system_artifacts.py @@ -21,7 +21,7 @@ from ml_metadata.metadata_store import mlmd_types -class SystemArtifact(abc.ABC): +class SystemArtifact(abc.ABC):# noqa: B024 """TFX system artifact base class. A user may create a subclass of SystemArtifact and override the @@ -30,7 +30,6 @@ class SystemArtifact(abc.ABC): The subclasses, e.g, Dataset, Model, Statistics, e.t.c, match the MLMD types from third_party/ml_metadata/metadata_store/mlmd_types.py. """ - # noqa: B024 # MLMD system base type enum. Override it when creating subclasses. MLMD_SYSTEM_BASE_TYPE = None diff --git a/tfx/types/system_executions.py b/tfx/types/system_executions.py index c12943d113..611a7529d4 100644 --- a/tfx/types/system_executions.py +++ b/tfx/types/system_executions.py @@ -21,7 +21,7 @@ from ml_metadata.metadata_store import mlmd_types -class SystemExecution(abc.ABC): +class SystemExecution(abc.ABC):# noqa: B024 """TFX system execution base class. A user may create a subclass of SystemExecution and override the @@ -30,7 +30,6 @@ class SystemExecution(abc.ABC): The subclasses, e.g, Train, Transform, Process, e.t.c, match the MLMD types from third_party/ml_metadata/metadata_store/mlmd_types.py. """ - # noqa: B024 # MLMD system base type enum. Override it when creating subclasses. MLMD_SYSTEM_BASE_TYPE = None From 675901737bf77a0630cbd1bbc6fad1505d6f91f4 Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Tue, 5 Nov 2024 12:31:37 +0530 Subject: [PATCH 5/8] Added new line at EOF --- tfx/orchestration/portable/base_executor_operator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tfx/orchestration/portable/base_executor_operator.py b/tfx/orchestration/portable/base_executor_operator.py index 05b9b5cc7b..9a36e877de 100644 --- a/tfx/orchestration/portable/base_executor_operator.py +++ b/tfx/orchestration/portable/base_executor_operator.py @@ -86,4 +86,5 @@ def with_execution_watcher( def handle_stop(self) -> None:# noqa: B027 """Executor Operator specific logic to clean up after it is stopped.""" - pass \ No newline at end of file + pass + \ No newline at end of file From dfb5516a3441d0da17233ba81141fa59fde7869a Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Tue, 5 Nov 2024 12:33:17 +0530 Subject: [PATCH 6/8] Added new line at EOF --- tfx/tools/cli/handler/dag_runner_patcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tfx/tools/cli/handler/dag_runner_patcher.py b/tfx/tools/cli/handler/dag_runner_patcher.py index fae34b3abf..ff8e108bcc 100644 --- a/tfx/tools/cli/handler/dag_runner_patcher.py +++ b/tfx/tools/cli/handler/dag_runner_patcher.py @@ -134,4 +134,5 @@ def wrapper(*args, **kwargs): self._after_run(runner, pipeline, self._context) return result - return wrapper \ No newline at end of file + return wrapper + \ No newline at end of file From 5ec4d6cef8e3110e1075c580e561ab367c888bb6 Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Tue, 5 Nov 2024 12:40:09 +0530 Subject: [PATCH 7/8] Fix lint errors --- tfx/orchestration/portable/base_executor_operator.py | 1 - tfx/tools/cli/handler/dag_runner_patcher.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tfx/orchestration/portable/base_executor_operator.py b/tfx/orchestration/portable/base_executor_operator.py index 9a36e877de..fd4d362811 100644 --- a/tfx/orchestration/portable/base_executor_operator.py +++ b/tfx/orchestration/portable/base_executor_operator.py @@ -87,4 +87,3 @@ def with_execution_watcher( def handle_stop(self) -> None:# noqa: B027 """Executor Operator specific logic to clean up after it is stopped.""" pass - \ No newline at end of file diff --git a/tfx/tools/cli/handler/dag_runner_patcher.py b/tfx/tools/cli/handler/dag_runner_patcher.py index ff8e108bcc..36e645e343 100644 --- a/tfx/tools/cli/handler/dag_runner_patcher.py +++ b/tfx/tools/cli/handler/dag_runner_patcher.py @@ -135,4 +135,3 @@ def wrapper(*args, **kwargs): return result return wrapper - \ No newline at end of file From c034df5104a51d339529a41753532fab5d935c94 Mon Sep 17 00:00:00 2001 From: janasangeetha Date: Wed, 6 Nov 2024 09:48:54 +0530 Subject: [PATCH 8/8] Fix ruff rules --- tfx/orchestration/portable/base_executor_operator.py | 2 +- tfx/tools/cli/handler/dag_runner_patcher.py | 4 ++-- tfx/types/system_artifacts.py | 2 +- tfx/types/system_executions.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tfx/orchestration/portable/base_executor_operator.py b/tfx/orchestration/portable/base_executor_operator.py index fd4d362811..88061b2157 100644 --- a/tfx/orchestration/portable/base_executor_operator.py +++ b/tfx/orchestration/portable/base_executor_operator.py @@ -84,6 +84,6 @@ def with_execution_watcher( self._execution_watcher_address = execution_watcher_address return self - def handle_stop(self) -> None:# noqa: B027 + def handle_stop(self) -> None: # noqa: B027 """Executor Operator specific logic to clean up after it is stopped.""" pass diff --git a/tfx/tools/cli/handler/dag_runner_patcher.py b/tfx/tools/cli/handler/dag_runner_patcher.py index 36e645e343..c42b5ce338 100644 --- a/tfx/tools/cli/handler/dag_runner_patcher.py +++ b/tfx/tools/cli/handler/dag_runner_patcher.py @@ -56,12 +56,12 @@ def __init__(self, call_real_run=True): self._run_called = False self._call_real_run = call_real_run - def _before_run(self, runner: tfx_runner.TfxRunner, + def _before_run(self, runner: tfx_runner.TfxRunner, # noqa: B027 pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], context: MutableMapping[str, Any]) -> None: pass - def _after_run(self, runner: tfx_runner.TfxRunner, + def _after_run(self, runner: tfx_runner.TfxRunner, # noqa: B027 pipeline: Union[pipeline_pb2.Pipeline, tfx_pipeline.Pipeline], context: MutableMapping[str, Any]) -> None: pass diff --git a/tfx/types/system_artifacts.py b/tfx/types/system_artifacts.py index 91b0e12783..8f7cef8933 100644 --- a/tfx/types/system_artifacts.py +++ b/tfx/types/system_artifacts.py @@ -21,7 +21,7 @@ from ml_metadata.metadata_store import mlmd_types -class SystemArtifact(abc.ABC):# noqa: B024 +class SystemArtifact(abc.ABC): # noqa: B024 """TFX system artifact base class. A user may create a subclass of SystemArtifact and override the diff --git a/tfx/types/system_executions.py b/tfx/types/system_executions.py index 611a7529d4..7eadbcd26f 100644 --- a/tfx/types/system_executions.py +++ b/tfx/types/system_executions.py @@ -21,7 +21,7 @@ from ml_metadata.metadata_store import mlmd_types -class SystemExecution(abc.ABC):# noqa: B024 +class SystemExecution(abc.ABC): # noqa: B024 """TFX system execution base class. A user may create a subclass of SystemExecution and override the