diff --git a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py index 37247b9..b059d8e 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py +++ b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py @@ -106,15 +106,21 @@ class PipesTestSuite: # to run all the tests BASE_ARGS = ["change", "me"] - @parametrize("metadata", METADATA_CASES) - def test_context_reconstruction( + def test_components( self, + request, metadata: Dict[str, Any], + context_injector: PipesContextInjector, + message_reader: PipesMessageReader, tmpdir_factory, capsys, ): - """This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly.""" + """This test checks if the external process can access the context and the message writer correctly. + + It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly. + It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly. + """ work_dir = tmpdir_factory.mktemp("work_dir") extras_path = work_dir / "extras.json" @@ -122,48 +128,18 @@ def test_context_reconstruction( with open(str(extras_path), "w") as f: json.dump(metadata, f) - @asset - def my_asset( - context: AssetExecutionContext, - pipes_subprocess_client: PipesSubprocessClient, - ) -> MaterializeResult: - job_name = context.dagster_run.job_name - - args = self.BASE_ARGS + [ - "--env", - f"--extras={str(extras_path)}", - f"--job-name={job_name}", - ] - - return pipes_subprocess_client.run( - context=context, - command=args, - extras=metadata, - ).get_materialize_result() - - result = materialize( - [my_asset], - resources={"pipes_subprocess_client": PipesSubprocessClient()}, - raise_on_error=False, - ) - - assert result.success - - def test_components( - self, - context_injector: PipesContextInjector, - message_reader: PipesMessageReader, - tmpdir_factory, - capsys, - ): @asset def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: args = self.BASE_ARGS + [ - "--env", - "--full", + "--extras", + str(extras_path), + "--job-name", + context.dagster_run.job_name, + "--test-name", + request.node.name, ] if isinstance(context_injector, PipesS3ContextInjector): @@ -202,13 +178,13 @@ def my_asset( ) def test_extras( self, + request, context_injector: PipesContextInjector, metadata: Dict[str, Any], tmpdir_factory, capsys, ): """This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly.""" - work_dir = tmpdir_factory.mktemp("work_dir") metadata_path = work_dir / "metadata.json" @@ -221,13 +197,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--extras={metadata_path}", - f"--job-name={job_name}", + "--extras", + metadata_path, + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -261,11 +235,11 @@ def my_asset( def test_error_reporting( self, + request, tmpdir_factory, capsys, ): """This test checks if the external process sends an exception message correctly.""" - if not PIPES_CONFIG.general.error_reporting: pytest.skip("general.error_reporting is not enabled in pipes.toml") @@ -281,8 +255,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--throw-error", + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -327,6 +301,7 @@ def my_asset( def test_message_log( self, + request, tmpdir_factory, capsys, ): @@ -345,8 +320,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--logging", + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -383,7 +358,7 @@ def my_asset( if f"{level.lower().capitalize()} message" in line: assert level in line logged_levels.add(level) - + assert logged_levels == expected_levels assert ( "[pipes] did not receive any messages from external process" @@ -393,6 +368,7 @@ def my_asset( @parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES) def test_message_report_custom_message( self, + request, custom_message_payload: Any, tmpdir_factory, capsys, @@ -414,14 +390,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", - "--custom-payload-path", + "--custom-payload", str(custom_payload_path), + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -454,6 +427,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_materialization( self, + request, data_version: Optional[str], asset_key: Optional[List[str]], tmpdir_factory, @@ -490,14 +464,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-materialization", str(asset_materialization_path), + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -546,6 +517,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_check( self, + request, passed: bool, asset_key: Optional[List[str]], severity: PipesAssetCheckSeverity, @@ -555,9 +527,7 @@ def test_message_report_asset_check( """This test checks if the external process sends asset checks correctly.""" if not PIPES_CONFIG.messages.report_asset_check: - pytest.skip( - "messages.report_asset_check is not enabled in pipes.toml" - ) + pytest.skip("messages.report_asset_check is not enabled in pipes.toml") work_dir = tmpdir_factory.mktemp("work_dir") @@ -587,14 +557,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ): - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-check", str(report_asset_check_path), + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run(