From 292cc3e67bd874d0af94feb5d1f1fea4481b8411 Mon Sep 17 00:00:00 2001 From: danielgafni Date: Wed, 25 Dec 2024 16:40:08 +0100 Subject: [PATCH 1/7] [pipes] refactor Pipes tests interface --- .../src/dagster_pipes_tests/suite.py | 109 ++++++------------ 1 file changed, 38 insertions(+), 71 deletions(-) diff --git a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py index 37247b9..b059d8e 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py +++ b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py @@ -106,15 +106,21 @@ class PipesTestSuite: # to run all the tests BASE_ARGS = ["change", "me"] - @parametrize("metadata", METADATA_CASES) - def test_context_reconstruction( + def test_components( self, + request, metadata: Dict[str, Any], + context_injector: PipesContextInjector, + message_reader: PipesMessageReader, tmpdir_factory, capsys, ): - """This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly.""" + """This test checks if the external process can access the context and the message writer correctly. + + It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly. + It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly. + """ work_dir = tmpdir_factory.mktemp("work_dir") extras_path = work_dir / "extras.json" @@ -122,48 +128,18 @@ def test_context_reconstruction( with open(str(extras_path), "w") as f: json.dump(metadata, f) - @asset - def my_asset( - context: AssetExecutionContext, - pipes_subprocess_client: PipesSubprocessClient, - ) -> MaterializeResult: - job_name = context.dagster_run.job_name - - args = self.BASE_ARGS + [ - "--env", - f"--extras={str(extras_path)}", - f"--job-name={job_name}", - ] - - return pipes_subprocess_client.run( - context=context, - command=args, - extras=metadata, - ).get_materialize_result() - - result = materialize( - [my_asset], - resources={"pipes_subprocess_client": PipesSubprocessClient()}, - raise_on_error=False, - ) - - assert result.success - - def test_components( - self, - context_injector: PipesContextInjector, - message_reader: PipesMessageReader, - tmpdir_factory, - capsys, - ): @asset def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: args = self.BASE_ARGS + [ - "--env", - "--full", + "--extras", + str(extras_path), + "--job-name", + context.dagster_run.job_name, + "--test-name", + request.node.name, ] if isinstance(context_injector, PipesS3ContextInjector): @@ -202,13 +178,13 @@ def my_asset( ) def test_extras( self, + request, context_injector: PipesContextInjector, metadata: Dict[str, Any], tmpdir_factory, capsys, ): """This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly.""" - work_dir = tmpdir_factory.mktemp("work_dir") metadata_path = work_dir / "metadata.json" @@ -221,13 +197,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--extras={metadata_path}", - f"--job-name={job_name}", + "--extras", + metadata_path, + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -261,11 +235,11 @@ def my_asset( def test_error_reporting( self, + request, tmpdir_factory, capsys, ): """This test checks if the external process sends an exception message correctly.""" - if not PIPES_CONFIG.general.error_reporting: pytest.skip("general.error_reporting is not enabled in pipes.toml") @@ -281,8 +255,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--throw-error", + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -327,6 +301,7 @@ def my_asset( def test_message_log( self, + request, tmpdir_factory, capsys, ): @@ -345,8 +320,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--logging", + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -383,7 +358,7 @@ def my_asset( if f"{level.lower().capitalize()} message" in line: assert level in line logged_levels.add(level) - + assert logged_levels == expected_levels assert ( "[pipes] did not receive any messages from external process" @@ -393,6 +368,7 @@ def my_asset( @parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES) def test_message_report_custom_message( self, + request, custom_message_payload: Any, tmpdir_factory, capsys, @@ -414,14 +390,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", - "--custom-payload-path", + "--custom-payload", str(custom_payload_path), + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -454,6 +427,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_materialization( self, + request, data_version: Optional[str], asset_key: Optional[List[str]], tmpdir_factory, @@ -490,14 +464,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-materialization", str(asset_materialization_path), + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( @@ -546,6 +517,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_check( self, + request, passed: bool, asset_key: Optional[List[str]], severity: PipesAssetCheckSeverity, @@ -555,9 +527,7 @@ def test_message_report_asset_check( """This test checks if the external process sends asset checks correctly.""" if not PIPES_CONFIG.messages.report_asset_check: - pytest.skip( - "messages.report_asset_check is not enabled in pipes.toml" - ) + pytest.skip("messages.report_asset_check is not enabled in pipes.toml") work_dir = tmpdir_factory.mktemp("work_dir") @@ -587,14 +557,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ): - job_name = context.dagster_run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-check", str(report_asset_check_path), + "--test-name", + request.node.name, ] invocation_result = pipes_subprocess_client.run( From fbba3f70b9158e883532f34c57285747eb949824 Mon Sep 17 00:00:00 2001 From: danielgafni Date: Wed, 25 Dec 2024 16:50:53 +0100 Subject: [PATCH 2/7] fix --test-name --- .../src/dagster_pipes_tests/suite.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py index b059d8e..33f4c07 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py +++ b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py @@ -56,6 +56,10 @@ METADATA = json.loads(METADATA_PATH.read_text()) + +def _get_current_test_name(request): + return request.node.name.split("[")[0] + def _resolve_metadata_value( value: Any, metadata_type: PipesMetadataType ) -> MetadataValue: @@ -139,7 +143,7 @@ def my_asset( "--job-name", context.dagster_run.job_name, "--test-name", - request.node.name, + _get_current_test_name(request), ] if isinstance(context_injector, PipesS3ContextInjector): @@ -201,7 +205,7 @@ def my_asset( "--extras", metadata_path, "--test-name", - request.node.name, + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -256,7 +260,7 @@ def my_asset( ): args = self.BASE_ARGS + [ "--test-name", - request.node.name, + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -321,7 +325,7 @@ def my_asset( ): args = self.BASE_ARGS + [ "--test-name", - request.node.name, + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -394,7 +398,7 @@ def my_asset( "--custom-payload", str(custom_payload_path), "--test-name", - request.node.name, + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -468,7 +472,7 @@ def my_asset( "--report-asset-materialization", str(asset_materialization_path), "--test-name", - request.node.name, + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -561,7 +565,7 @@ def my_asset( "--report-asset-check", str(report_asset_check_path), "--test-name", - request.node.name, + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( From 68d19d5de116109e5a0c381f2f463f8bac7a497d Mon Sep 17 00:00:00 2001 From: GingerYouth Date: Wed, 25 Dec 2024 18:56:10 +0300 Subject: [PATCH 3/7] Big tests refactoring --- .../src/test/java/pipes/MainTest.java | 118 +++++++----------- .../src/test/java/pipes/PipesTests.java | 40 +++--- 2 files changed, 60 insertions(+), 98 deletions(-) diff --git a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java index e6424d5..ccc4f74 100644 --- a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java +++ b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java @@ -28,24 +28,12 @@ public class MainTest implements Runnable { private final ObjectMapper objectMapper = new ObjectMapper(); private Map cachedJson = new ConcurrentHashMap<>(); - @CommandLine.Option( - names = {"--context"}, - description = "Provide DAGSTER_PIPES_CONTEXT value for testing" - ) - private String context; - - @CommandLine.Option( - names = {"--messages"}, - description = "Provide DAGSTER_PIPES_MESSAGES value for testing" - ) - private String messages; - - @CommandLine.Option( - names = {"--env"}, - description = "Get DAGSTER_PIPES_MESSAGES & DAGSTER_PIPES_CONTEXT values " + - "from environmental variables" - ) - private boolean env = false; +// @CommandLine.Option( +// names = {"--env"}, +// description = "Get DAGSTER_PIPES_MESSAGES & DAGSTER_PIPES_CONTEXT values " + +// "from environmental variables" +// ) +// private boolean env = false; @CommandLine.Option( names = {"--job-name"}, @@ -59,14 +47,14 @@ public class MainTest implements Runnable { ) private String extras; - @CommandLine.Option( - names = {"--full"}, - description = "Flag to test full PipesContext usage" - ) - private boolean full = false; +// @CommandLine.Option( +// names = {"--full"}, +// description = "Flag to test full PipesContext usage" +// ) +// private boolean full = false; @CommandLine.Option( - names = {"--custom-payload-path"}, + names = {"--custom-payload"}, description = "Specify custom payload path" ) private String customPayloadPath; @@ -83,17 +71,17 @@ public class MainTest implements Runnable { ) private String reportAssetMaterializationJson; - @CommandLine.Option( - names = {"--throw-error"}, - description = "Throw exception in PipesSession with specified message" - ) - private boolean throwException = false; +// @CommandLine.Option( +// names = {"--throw-error"}, +// description = "Throw exception in PipesSession with specified message" +// ) +// private boolean throwException = false; - @CommandLine.Option( - names = {"--logging"}, - description = "Flag to test logging" - ) - private boolean logging = false; +// @CommandLine.Option( +// names = {"--logging"}, +// description = "Flag to test logging" +// ) +// private boolean logging = false; @CommandLine.Option( names = {"--message-writer"}, @@ -107,47 +95,34 @@ public class MainTest implements Runnable { ) private String contextLoaderType; + @CommandLine.Option( + names = {"--test-name"}, + description = "Specify the name of the test" + ) + private String testName; + @Override public void run() { - Map input = new HashMap<>(); PipesTests pipesTests = new PipesTests(); try { - if (this.context != null) { - input.put(PipesConstants.CONTEXT_ENV_VAR.name, context); - } - if (this.messages != null) { - input.put(PipesConstants.MESSAGES_ENV_VAR.name, this.messages); - } - pipesTests.setInput(input); - final PipesContextLoader loader; if (this.contextLoaderType != null && !this.contextLoaderType.isEmpty()) { - switch (this.contextLoaderType) { - case "s3": - S3Client amazonS3Client = S3Client.builder().build(); - loader = new PipesS3ContextLoader(amazonS3Client); - break; - case "default": - loader = new PipesDefaultContextLoader(); - break; - default: - throw new IllegalArgumentException("Specified unknown context loader type!"); + if (this.contextLoaderType.equals("s3")) { + S3Client amazonS3Client = S3Client.builder().build(); + loader = new PipesS3ContextLoader(amazonS3Client); + } else { + loader = new PipesDefaultContextLoader(); } pipesTests.setContextLoader(loader); } final PipesMessageWriter writer; if (this.messageWriter != null && !this.messageWriter.isEmpty()) { - switch (this.messageWriter) { - case "s3": - S3Client amazonS3Client = S3Client.builder().build(); - writer = new PipesS3MessageWriter(amazonS3Client); - break; - case "default": - writer = new PipesDefaultMessageWriter(); - break; - default: - throw new IllegalArgumentException("Specified unknown message writer!"); + if (this.messageWriter.equals("s3")) { + S3Client amazonS3Client = S3Client.builder().build(); + writer = new PipesS3MessageWriter(amazonS3Client); + } else { + writer = new PipesDefaultMessageWriter(); } pipesTests.setMessageWriter(writer); } @@ -159,12 +134,12 @@ public void run() { pipesTests.setPayload(payload); } - if (this.throwException) { + if (this.testName != null && this.testName.equals("test_error_reporting")) { pipesTests.testRunPipesSessionWithException(); return; } - if (this.logging) { + if (this.testName != null && this.testName.equals("test_message_log")) { pipesTests.testLogging(); return; } @@ -186,27 +161,20 @@ public void run() { pipesTests.setCheck(checkName, passed, assetKey); } - if (this.full) { - pipesTests.fullTest(); - return; - } else { - pipesTests.setContextData(); - } - if (this.extras != null) { File jsonFile = new File(this.extras); ObjectMapper objectMapper = new ObjectMapper(); Map extrasMap = objectMapper.readValue( - jsonFile, new TypeReference>() {} + jsonFile, new TypeReference>() {} ); pipesTests.setExtras(extrasMap); - pipesTests.testExtras(); } if (this.jobName != null) { pipesTests.setJobName(this.jobName); - pipesTests.testJobName(); } + + pipesTests.fullTest(); } catch (IOException | DagsterPipesException exception) { throw new RuntimeException(exception); } diff --git a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java index 06424ad..6748711 100644 --- a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java +++ b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java @@ -51,10 +51,6 @@ void setJobName(String jobName) { this.jobName = jobName; } - void setContextData() throws DagsterPipesException { - this.contextData = DataLoader.getData(input); - } - void setContextLoader(PipesContextLoader contextLoader) throws DagsterPipesException { this.contextLoader = contextLoader; } @@ -81,25 +77,6 @@ void setMessageWriter(PipesMessageWriter wr this.pipesMessageWriter = writer; } - @Test - void testExtras() { - Assertions.assertTrue( - contextData.getExtras().entrySet().containsAll(this.extras.entrySet()), - "Extras does not contain all provided entries." - ); - System.out.println("Extras are correct."); - } - - @Test - void testJobName() { - Assertions.assertEquals( - this.jobName, - contextData.getJobName(), - "JobName is incorrect." - ); - System.out.println("JobName is correct."); - } - @Test void fullTest() throws DagsterPipesException { getTestSession().runDagsterPipes(this::fullTest); @@ -108,6 +85,23 @@ void fullTest() throws DagsterPipesException { private void fullTest(PipesContext context) throws DagsterPipesException { context.reportCustomMessage("Hello from external process!"); + if (this.extras != null) { + Assertions.assertTrue( + context.getExtras().entrySet().containsAll(this.extras.entrySet()), + "Extras does not contain all provided entries." + ); + System.out.println("Extras are correct."); + } + + if (this.jobName != null) { + Assertions.assertEquals( + this.jobName, + context.getJobName(), + "JobName is incorrect." + ); + System.out.println("JobName is correct."); + } + if (this.payload != null) { context.reportCustomMessage(this.payload); System.out.println("Payload reported with custom message."); From 385428e01c9c1362d518bf15340e21ff015b882a Mon Sep 17 00:00:00 2001 From: danielgafni Date: Wed, 25 Dec 2024 16:40:08 +0100 Subject: [PATCH 4/7] [pipes] refactor Pipes tests interface --- .../pipes/tests/dagster-pipes-tests/README.md | 25 +++- .../src/dagster_pipes_tests/suite.py | 113 +++++++----------- 2 files changed, 61 insertions(+), 77 deletions(-) diff --git a/libraries/pipes/tests/dagster-pipes-tests/README.md b/libraries/pipes/tests/dagster-pipes-tests/README.md index f88d26a..82f9516 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/README.md +++ b/libraries/pipes/tests/dagster-pipes-tests/README.md @@ -20,15 +20,17 @@ See the [pipes_config.py](src/dagster_pipes_tests/pipes_config.py) class for mor In order to run the tests, follow these steps: -1. Install `pytest` and `dagster-pipes-tests`: +1. Install `pytest` and `dagster-pipes-tests`. This can be done with [uv](https://docs.astral.sh/uv/): ```shell -uv pip install pytest -# TODO: publish the package to PyPI -uv pip install +# assuming the command is run in libraries/pipes/implementations/ +uv add --group dev pytest --editable ../../tests/dagster-pipes-tests ``` -2. Import the test suite in your `pytest` code and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite. +> [!NOTE] +> To install `dagster-pipes-tests` in a repository other than this one, replace `--editable ../../tests/dagster-pipes-tests` with `git+https://github.com/dagster-io/communioty-integrations.git#subdirectory=libraries/pipes/tests/dagster-pipes-tests` + +2. Import the test suite in your `pytest` code (for example, in `tests/test_pipes.py`) and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite. For example, for Java: @@ -45,8 +47,13 @@ class TestJavaPipes(PipesTestSuite): ] ``` -3 [Optional]. When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling. +> [!NOTE] +> Each test has it's own `--test-name` argument which can be used to identify the test being run. + +> [!WARNING] +> This code must be placed in a file that is discovered by `pytest`, e.g. starts with `test_`. +When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling. For example, for Java, put the following code in `conftest.py`: @@ -59,3 +66,9 @@ import subprocess def built_jar(): subprocess.run(["./gradlew", "build"], check=True) ``` + +4. Run the tests with `pytest`: + +```shell +uv run pytest +``` diff --git a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py index acb7113..3bd420c 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py +++ b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py @@ -57,6 +57,10 @@ METADATA = json.loads(METADATA_PATH.read_text()) + +def _get_current_test_name(request): + return request.node.name.split("[")[0] + def _resolve_metadata_value( value: Any, metadata_type: PipesMetadataType ) -> MetadataValue: @@ -107,15 +111,21 @@ class PipesTestSuite: # to run all the tests BASE_ARGS = ["change", "me"] - @parametrize("metadata", METADATA_CASES) - def test_context_reconstruction( + def test_components( self, + request, metadata: Dict[str, Any], + context_injector: PipesContextInjector, + message_reader: PipesMessageReader, tmpdir_factory, capsys, ): - """This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly.""" + """This test checks if the external process can access the context and the message writer correctly. + It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly. + + It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly. + """ work_dir = tmpdir_factory.mktemp("work_dir") extras_path = work_dir / "extras.json" @@ -123,48 +133,18 @@ def test_context_reconstruction( with open(str(extras_path), "w") as f: json.dump(metadata, f) - @asset - def my_asset( - context: AssetExecutionContext, - pipes_subprocess_client: PipesSubprocessClient, - ) -> MaterializeResult: - job_name = context.run.job_name - - args = self.BASE_ARGS + [ - "--env", - f"--extras={str(extras_path)}", - f"--job-name={job_name}", - ] - - return pipes_subprocess_client.run( - context=context, - command=args, - extras=metadata, - ).get_materialize_result() - - result = materialize( - [my_asset], - resources={"pipes_subprocess_client": PipesSubprocessClient()}, - raise_on_error=False, - ) - - assert result.success - - def test_components( - self, - context_injector: PipesContextInjector, - message_reader: PipesMessageReader, - tmpdir_factory, - capsys, - ): @asset def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: args = self.BASE_ARGS + [ - "--env", - "--full", + "--extras", + str(extras_path), + "--job-name", + context.run.job_name, + "--test-name", + _get_current_test_name(request), ] if isinstance(context_injector, PipesS3ContextInjector): @@ -203,13 +183,13 @@ def my_asset( ) def test_extras( self, + request, context_injector: PipesContextInjector, metadata: Dict[str, Any], tmpdir_factory, capsys, ): """This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly.""" - work_dir = tmpdir_factory.mktemp("work_dir") metadata_path = work_dir / "metadata.json" @@ -222,13 +202,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--extras={metadata_path}", - f"--job-name={job_name}", + "--extras", + metadata_path, + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -262,11 +240,11 @@ def my_asset( def test_error_reporting( self, + request, tmpdir_factory, capsys, ): """This test checks if the external process sends an exception message correctly.""" - if not PIPES_CONFIG.general.error_reporting: pytest.skip("general.error_reporting is not enabled in pipes.toml") @@ -282,8 +260,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--throw-error", + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -328,6 +306,7 @@ def my_asset( def test_message_log( self, + request, tmpdir_factory, capsys, ): @@ -346,8 +325,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--logging", + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -384,7 +363,7 @@ def my_asset( if f"{level.lower().capitalize()} message" in line: assert level in line logged_levels.add(level) - + assert logged_levels == expected_levels assert ( "[pipes] did not receive any messages from external process" @@ -394,6 +373,7 @@ def my_asset( @parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES) def test_message_report_custom_message( self, + request, custom_message_payload: Any, tmpdir_factory, capsys, @@ -415,14 +395,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", - "--custom-payload-path", + "--custom-payload", str(custom_payload_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -455,6 +432,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_materialization( self, + request, data_version: Optional[str], asset_key: Optional[List[str]], tmpdir_factory, @@ -491,14 +469,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-materialization", str(asset_materialization_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -547,6 +522,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_check( self, + request, passed: bool, asset_key: Optional[List[str]], severity: PipesAssetCheckSeverity, @@ -556,9 +532,7 @@ def test_message_report_asset_check( """This test checks if the external process sends asset checks correctly.""" if not PIPES_CONFIG.messages.report_asset_check: - pytest.skip( - "messages.report_asset_check is not enabled in pipes.toml" - ) + pytest.skip("messages.report_asset_check is not enabled in pipes.toml") work_dir = tmpdir_factory.mktemp("work_dir") @@ -588,14 +562,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ): - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-check", str(report_asset_check_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( From 565244e2b698b218d0aa32979e7750ccea7b117f Mon Sep 17 00:00:00 2001 From: GingerYouth Date: Sat, 28 Dec 2024 12:21:11 +0300 Subject: [PATCH 5/7] Fixed bad merge --- .../src/dagster_pipes_tests/suite.py | 115 +++++++++++------- 1 file changed, 70 insertions(+), 45 deletions(-) diff --git a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py index a28715e..a5971b0 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py +++ b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py @@ -57,10 +57,6 @@ METADATA = json.loads(METADATA_PATH.read_text()) - -def _get_current_test_name(request): - return request.node.name.split("[")[0] - def _resolve_metadata_value( value: Any, metadata_type: PipesMetadataType ) -> MetadataValue: @@ -111,21 +107,15 @@ class PipesTestSuite: # to run all the tests BASE_ARGS = ["change", "me"] - def test_components( + @parametrize("metadata", METADATA_CASES) + def test_context_reconstruction( self, - request, metadata: Dict[str, Any], - context_injector: PipesContextInjector, - message_reader: PipesMessageReader, tmpdir_factory, capsys, ): - """This test checks if the external process can access the context and the message writer correctly. + """This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly.""" - It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly. - - It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly. - """ work_dir = tmpdir_factory.mktemp("work_dir") extras_path = work_dir / "extras.json" @@ -138,17 +128,43 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: + job_name = context.run.job_name + args = self.BASE_ARGS + [ - "--extras", - str(extras_path), - "--job-name", -<<<<<<< HEAD - context.dagster_run.job_name, -======= - context.run.job_name, ->>>>>>> 385428e01c9c1362d518bf15340e21ff015b882a - "--test-name", - _get_current_test_name(request), + "--env", + f"--extras={str(extras_path)}", + f"--job-name={job_name}", + ] + + return pipes_subprocess_client.run( + context=context, + command=args, + extras=metadata, + ).get_materialize_result() + + result = materialize( + [my_asset], + resources={"pipes_subprocess_client": PipesSubprocessClient()}, + raise_on_error=False, + ) + + assert result.success + + def test_components( + self, + context_injector: PipesContextInjector, + message_reader: PipesMessageReader, + tmpdir_factory, + capsys, + ): + @asset + def my_asset( + context: AssetExecutionContext, + pipes_subprocess_client: PipesSubprocessClient, + ) -> MaterializeResult: + args = self.BASE_ARGS + [ + "--env", + "--full", ] if isinstance(context_injector, PipesS3ContextInjector): @@ -187,13 +203,13 @@ def my_asset( ) def test_extras( self, - request, context_injector: PipesContextInjector, metadata: Dict[str, Any], tmpdir_factory, capsys, ): """This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly.""" + work_dir = tmpdir_factory.mktemp("work_dir") metadata_path = work_dir / "metadata.json" @@ -206,11 +222,13 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: + job_name = context.run.job_name + args = self.BASE_ARGS + [ - "--extras", - metadata_path, - "--test-name", - _get_current_test_name(request), + "--full", + "--env", + f"--extras={metadata_path}", + f"--job-name={job_name}", ] invocation_result = pipes_subprocess_client.run( @@ -244,11 +262,11 @@ def my_asset( def test_error_reporting( self, - request, tmpdir_factory, capsys, ): """This test checks if the external process sends an exception message correctly.""" + if not PIPES_CONFIG.general.error_reporting: pytest.skip("general.error_reporting is not enabled in pipes.toml") @@ -264,8 +282,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--test-name", - _get_current_test_name(request), + "--full", + "--throw-error", ] invocation_result = pipes_subprocess_client.run( @@ -310,7 +328,6 @@ def my_asset( def test_message_log( self, - request, tmpdir_factory, capsys, ): @@ -329,8 +346,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--test-name", - _get_current_test_name(request), + "--full", + "--logging", ] invocation_result = pipes_subprocess_client.run( @@ -377,7 +394,6 @@ def my_asset( @parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES) def test_message_report_custom_message( self, - request, custom_message_payload: Any, tmpdir_factory, capsys, @@ -399,11 +415,14 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: + job_name = context.run.job_name + args = self.BASE_ARGS + [ - "--custom-payload", + "--full", + "--env", + f"--job-name={job_name}", + "--custom-payload-path", str(custom_payload_path), - "--test-name", - _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -436,7 +455,6 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_materialization( self, - request, data_version: Optional[str], asset_key: Optional[List[str]], tmpdir_factory, @@ -473,11 +491,14 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: + job_name = context.run.job_name + args = self.BASE_ARGS + [ + "--full", + "--env", + f"--job-name={job_name}", "--report-asset-materialization", str(asset_materialization_path), - "--test-name", - _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -526,7 +547,6 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_check( self, - request, passed: bool, asset_key: Optional[List[str]], severity: PipesAssetCheckSeverity, @@ -536,7 +556,9 @@ def test_message_report_asset_check( """This test checks if the external process sends asset checks correctly.""" if not PIPES_CONFIG.messages.report_asset_check: - pytest.skip("messages.report_asset_check is not enabled in pipes.toml") + pytest.skip( + "messages.report_asset_check is not enabled in pipes.toml" + ) work_dir = tmpdir_factory.mktemp("work_dir") @@ -566,11 +588,14 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ): + job_name = context.run.job_name + args = self.BASE_ARGS + [ + "--full", + "--env", + f"--job-name={job_name}", "--report-asset-check", str(report_asset_check_path), - "--test-name", - _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( From 74562edde1acbec405db4ba613a44c2f962881c5 Mon Sep 17 00:00:00 2001 From: GingerYouth Date: Sat, 28 Dec 2024 12:36:03 +0300 Subject: [PATCH 6/7] Suite.py --- .../src/dagster_pipes_tests/suite.py | 111 +++++++----------- 1 file changed, 41 insertions(+), 70 deletions(-) diff --git a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py index a5971b0..3bd420c 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py +++ b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py @@ -57,6 +57,10 @@ METADATA = json.loads(METADATA_PATH.read_text()) + +def _get_current_test_name(request): + return request.node.name.split("[")[0] + def _resolve_metadata_value( value: Any, metadata_type: PipesMetadataType ) -> MetadataValue: @@ -107,15 +111,21 @@ class PipesTestSuite: # to run all the tests BASE_ARGS = ["change", "me"] - @parametrize("metadata", METADATA_CASES) - def test_context_reconstruction( + def test_components( self, + request, metadata: Dict[str, Any], + context_injector: PipesContextInjector, + message_reader: PipesMessageReader, tmpdir_factory, capsys, ): - """This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly.""" + """This test checks if the external process can access the context and the message writer correctly. + It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly. + + It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly. + """ work_dir = tmpdir_factory.mktemp("work_dir") extras_path = work_dir / "extras.json" @@ -128,43 +138,13 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--env", - f"--extras={str(extras_path)}", - f"--job-name={job_name}", - ] - - return pipes_subprocess_client.run( - context=context, - command=args, - extras=metadata, - ).get_materialize_result() - - result = materialize( - [my_asset], - resources={"pipes_subprocess_client": PipesSubprocessClient()}, - raise_on_error=False, - ) - - assert result.success - - def test_components( - self, - context_injector: PipesContextInjector, - message_reader: PipesMessageReader, - tmpdir_factory, - capsys, - ): - @asset - def my_asset( - context: AssetExecutionContext, - pipes_subprocess_client: PipesSubprocessClient, - ) -> MaterializeResult: - args = self.BASE_ARGS + [ - "--env", - "--full", + "--extras", + str(extras_path), + "--job-name", + context.run.job_name, + "--test-name", + _get_current_test_name(request), ] if isinstance(context_injector, PipesS3ContextInjector): @@ -203,13 +183,13 @@ def my_asset( ) def test_extras( self, + request, context_injector: PipesContextInjector, metadata: Dict[str, Any], tmpdir_factory, capsys, ): """This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly.""" - work_dir = tmpdir_factory.mktemp("work_dir") metadata_path = work_dir / "metadata.json" @@ -222,13 +202,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--extras={metadata_path}", - f"--job-name={job_name}", + "--extras", + metadata_path, + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -262,11 +240,11 @@ def my_asset( def test_error_reporting( self, + request, tmpdir_factory, capsys, ): """This test checks if the external process sends an exception message correctly.""" - if not PIPES_CONFIG.general.error_reporting: pytest.skip("general.error_reporting is not enabled in pipes.toml") @@ -282,8 +260,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--throw-error", + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -328,6 +306,7 @@ def my_asset( def test_message_log( self, + request, tmpdir_factory, capsys, ): @@ -346,8 +325,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--logging", + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -394,6 +373,7 @@ def my_asset( @parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES) def test_message_report_custom_message( self, + request, custom_message_payload: Any, tmpdir_factory, capsys, @@ -415,14 +395,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", - "--custom-payload-path", + "--custom-payload", str(custom_payload_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -455,6 +432,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_materialization( self, + request, data_version: Optional[str], asset_key: Optional[List[str]], tmpdir_factory, @@ -491,14 +469,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-materialization", str(asset_materialization_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -547,6 +522,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_check( self, + request, passed: bool, asset_key: Optional[List[str]], severity: PipesAssetCheckSeverity, @@ -556,9 +532,7 @@ def test_message_report_asset_check( """This test checks if the external process sends asset checks correctly.""" if not PIPES_CONFIG.messages.report_asset_check: - pytest.skip( - "messages.report_asset_check is not enabled in pipes.toml" - ) + pytest.skip("messages.report_asset_check is not enabled in pipes.toml") work_dir = tmpdir_factory.mktemp("work_dir") @@ -588,14 +562,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ): - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-check", str(report_asset_check_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( From 8485cdfd7575e774e5c917e00ce563c51df503e3 Mon Sep 17 00:00:00 2001 From: GingerYouth Date: Sat, 28 Dec 2024 22:32:45 +0300 Subject: [PATCH 7/7] Tests refactoring --- .../src/main/java/pipes/utils/Example.java | 2 -- .../src/test/java/pipes/MainTest.java | 29 ++----------------- .../src/test/java/pipes/PipesTests.java | 7 ----- 3 files changed, 3 insertions(+), 35 deletions(-) diff --git a/libraries/pipes/implementations/java/dagster-pipes-java/src/main/java/pipes/utils/Example.java b/libraries/pipes/implementations/java/dagster-pipes-java/src/main/java/pipes/utils/Example.java index 7b03b44..7c1b601 100644 --- a/libraries/pipes/implementations/java/dagster-pipes-java/src/main/java/pipes/utils/Example.java +++ b/libraries/pipes/implementations/java/dagster-pipes-java/src/main/java/pipes/utils/Example.java @@ -3,7 +3,6 @@ import pipes.DagsterPipesException; import pipes.PipesContext; import pipes.PipesSession; -import pipes.data.PipesMetadata; import pipes.loaders.PipesContextLoader; import pipes.loaders.PipesDefaultContextLoader; import pipes.loaders.PipesEnvVarParamsLoader; @@ -11,7 +10,6 @@ import pipes.writers.PipesDefaultMessageWriter; import pipes.writers.PipesMessageWriter; import pipes.writers.PipesMessageWriterChannel; -import types.Type; import java.util.HashMap; import java.util.Map; diff --git a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java index 67935f4..4aedfe9 100644 --- a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java +++ b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/MainTest.java @@ -29,13 +29,6 @@ public class MainTest implements Runnable { private final ObjectMapper objectMapper = new ObjectMapper(); private Map cachedJson = new ConcurrentHashMap<>(); -// @CommandLine.Option( -// names = {"--env"}, -// description = "Get DAGSTER_PIPES_MESSAGES & DAGSTER_PIPES_CONTEXT values " + -// "from environmental variables" -// ) -// private boolean env = false; - @CommandLine.Option( names = {"--job-name"}, description = "Provide value of 'jobName' for testing" @@ -48,12 +41,6 @@ public class MainTest implements Runnable { ) private String extras; -// @CommandLine.Option( -// names = {"--full"}, -// description = "Flag to test full PipesContext usage" -// ) -// private boolean full = false; - @CommandLine.Option( names = {"--custom-payload"}, description = "Specify custom payload path" @@ -72,18 +59,6 @@ public class MainTest implements Runnable { ) private String reportAssetMaterializationJson; -// @CommandLine.Option( -// names = {"--throw-error"}, -// description = "Throw exception in PipesSession with specified message" -// ) -// private boolean throwException = false; - -// @CommandLine.Option( -// names = {"--logging"}, -// description = "Flag to test logging" -// ) -// private boolean logging = false; - @CommandLine.Option( names = {"--message-writer"}, description = "Specify the type of the message writer: default,s3" @@ -159,7 +134,9 @@ public void run() { String checkName = loadParamByWrapperKey("checkName", String.class); boolean passed = loadParamByWrapperKey("passed", Boolean.class); String assetKey = loadParamByWrapperKey("assetKey", String.class); - PipesAssetCheckSeverity severity = PipesAssetCheckSeverity.valueOf(loadParamByWrapperKey("severity", String.class)); + PipesAssetCheckSeverity severity = PipesAssetCheckSeverity.valueOf( + loadParamByWrapperKey("severity", String.class) + ); pipesTests.setCheck(checkName, passed, assetKey, severity); } diff --git a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java index 4d239f4..00461bf 100644 --- a/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java +++ b/libraries/pipes/implementations/java/dagster-pipes-java/src/test/java/pipes/PipesTests.java @@ -17,9 +17,7 @@ @Disabled public class PipesTests { - private Map input; private PipesContextLoader contextLoader; - private PipesContextData contextData; private Map extras; private String jobName; private Object payload; @@ -41,10 +39,6 @@ public class PipesTests { //Message writer private PipesMessageWriter pipesMessageWriter; - void setInput(Map input) { - this.input = input; - } - void setExtras(Map extras) { this.extras = extras; } @@ -57,7 +51,6 @@ void setContextLoader(PipesContextLoader contextLoader) throws DagsterPipesExcep this.contextLoader = contextLoader; } - void setPayload(Object payload) { this.payload = payload; }