From 8b32062566a59f435a339502c85e4d8b730d1e0b Mon Sep 17 00:00:00 2001 From: Mikita Sakalouski <38785549+mikita-sakalouski@users.noreply.github.com> Date: Fri, 24 May 2024 16:12:06 +0200 Subject: [PATCH] [FEATURE] Switch testing to native hatch command (#8) ## Description Switch to `hatch test` command and improve tests. ## Related Issue #5 ## Motivation and Context Use native hatch approach ## How Has This Been Tested? Run all tests ## Types of changes - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Checklist: - [x] My code follows the code style of this project. - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. - [ ] I have added tests to cover my changes. - [x] All new and existing tests passed. --------- Co-authored-by: Danny Meijer --- .github/images/logo_koheesio.svg | 31 ++ .github/workflows/test.yaml | 29 +- .gitignore | 8 +- Makefile | 20 +- README.md | 185 ++++---- koheesio/__about__.py | 17 +- koheesio/__init__.py | 9 +- koheesio/context.py | 2 +- koheesio/logger.py | 2 +- koheesio/models/__init__.py | 17 +- koheesio/models/sql.py | 2 +- koheesio/steps/asyncio/__init__.py | 6 +- koheesio/steps/asyncio/http.py | 1 + koheesio/steps/delta.py | 1 + koheesio/steps/http.py | 2 +- koheesio/steps/integrations/box.py | 11 +- .../integrations/dq/spark_expectations.py | 6 +- .../steps/integrations/notifications/slack.py | 2 +- .../steps/integrations/secrets/__init__.py | 2 +- koheesio/steps/integrations/snowflake.py | 2 +- koheesio/steps/integrations/sso/okta.py | 2 +- koheesio/steps/readers/__init__.py | 2 +- .../steps/readers/databricks/autoloader.py | 2 +- koheesio/steps/readers/file_loader.py | 6 +- koheesio/steps/readers/memory.py | 2 +- koheesio/steps/readers/rest_api.py | 9 +- koheesio/steps/spark.py | 3 +- koheesio/steps/step.py | 10 +- koheesio/steps/transformations/__init__.py | 6 +- koheesio/steps/transformations/arrays.py | 2 +- .../steps/transformations/camel_to_snake.py | 4 +- .../transformations/date_time/__init__.py | 3 +- .../transformations/date_time/interval.py | 8 +- koheesio/steps/transformations/lookup.py | 6 +- .../transformations/strings/change_case.py | 12 +- .../steps/transformations/strings/split.py | 8 +- .../steps/transformations/strings/trim.py | 8 +- koheesio/steps/transformations/transform.py | 2 +- koheesio/steps/transformations/uuid5.py | 4 +- koheesio/steps/writers/__init__.py | 2 +- koheesio/steps/writers/buffer.py | 29 +- koheesio/steps/writers/delta/batch.py | 3 +- koheesio/steps/writers/delta/scd.py | 4 +- koheesio/steps/writers/delta/stream.py | 2 +- koheesio/steps/writers/sftp.py | 4 +- koheesio/steps/writers/stream.py | 2 +- koheesio/utils/__init__.py | 2 +- pyproject.toml | 406 ++++++++++-------- {test => tests}/_data/context/common.yml | 0 {test => tests}/_data/context/dev.yml | 0 {test => tests}/_data/context/sample.json | 0 {test => tests}/_data/context/sample.toml | 0 {test => tests}/_data/context/sample.yaml | 0 ...-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc | Bin ...3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro | Bin .../_data/readers/csv_file/dummy.csv | 0 .../readers/csv_file/dummy_semicolon.csv | 0 ...-896a-6a12362bf370-c000.snappy.parquet.crc | Bin .../_delta_log/.00000000000000000000.json.crc | Bin .../_delta_log/00000000000000000000.json | 0 ...4d48-896a-6a12362bf370-c000.snappy.parquet | Bin .../_data/readers/json_file/dummy.json | 0 .../_data/readers/json_file/dummy_simple.json | 0 ...44b1-baf4-d263f748b060-c000.snappy.orc.crc | Bin ...ef4-44b1-baf4-d263f748b060-c000.snappy.orc | Bin .../_data/sql/spark_sql_reader.sql | 0 .../_data/steps/expected_step_output.yaml | 0 .../steps/expected_step_output_simple.yaml | 0 .../_data/transformations/dummy.sql | 0 .../test_product_dq_stats.ddl | 0 .../test_product_rules.ddl | 0 .../test_product_rules.sql | 0 .../string_data/100000_rows_with_strings.json | 0 {test => tests}/conftest.py | 0 {test => tests}/core/test_context.py | 7 +- {test => tests}/core/test_init.py | 0 {test => tests}/core/test_logger.py | 2 +- {test => tests}/core/test_steps.py | 4 +- .../deprecated/nike/ada/__init__.py | 0 {test => tests}/models/test_models.py | 3 +- {test => tests}/spark/conftest.py | 5 +- .../spark/integrations/box/test_box.py | 3 + .../dq/test_spark_expectations.py | 31 +- .../integrations/snowflake/test_snowflake.py | 3 + .../integrations/snowflake/test_sync_task.py | 8 +- .../spark/readers/test_auto_loader.py | 5 +- .../spark/readers/test_delta_reader.py | 3 + .../spark/readers/test_file_loader.py | 2 + {test => tests}/spark/readers/test_hana.py | 4 + {test => tests}/spark/readers/test_jdbc.py | 3 + {test => tests}/spark/readers/test_memory.py | 3 + .../spark/readers/test_metastore_reader.py | 4 + {test => tests}/spark/readers/test_reader.py | 4 + .../spark/readers/test_rest_api.py | 5 +- .../spark/readers/test_spark_sql_reader.py | 6 +- .../spark/readers/test_teradata.py | 4 + {test => tests}/spark/tasks/test_etl_task.py | 4 + {test => tests}/spark/test_delta.py | 4 + {test => tests}/spark/test_spark.py | 4 +- {test => tests}/spark/test_warnings.py | 2 + .../date_time/test_date_time.py | 2 + .../date_time/test_interval.py | 3 + .../strings/test_change_case.py | 2 + .../transformations/strings/test_concat.py | 2 + .../spark/transformations/strings/test_pad.py | 2 + .../transformations/strings/test_regexp.py | 2 + .../transformations/strings/test_split.py | 2 + .../strings/test_string_replace.py | 2 + .../transformations/strings/test_substring.py | 2 + .../transformations/strings/test_trim.py | 3 + .../spark/transformations/test_arrays.py | 3 + .../test_camel_to_snake_transform.py | 3 + .../transformations/test_cast_to_datatype.py | 4 + .../spark/transformations/test_drop_column.py | 2 + .../spark/transformations/test_get_item.py | 2 + .../spark/transformations/test_hash.py | 2 + .../spark/transformations/test_lookup.py | 3 + .../spark/transformations/test_repartition.py | 2 + .../spark/transformations/test_replace.py | 2 + .../transformations/test_row_number_dedup.py | 3 + .../transformations/test_sql_transform.py | 2 + .../spark/transformations/test_transform.py | 4 + .../transformations/test_transformation.py | 3 + .../spark/transformations/test_uuid5.py | 2 + .../spark/writers/delta/test_delta_writer.py | 4 + .../spark/writers/delta/test_scd.py | 5 + {test => tests}/spark/writers/test_buffer.py | 3 + {test => tests}/spark/writers/test_dummy.py | 2 + {test => tests}/spark/writers/test_sftp.py | 2 + {test => tests}/spark/writers/test_stream.py | 2 + .../steps/asyncio/test_asyncio_http.py | 3 +- .../integrations/notifications/test_slack.py | 0 .../steps/integrations/sso/test_okta.py | 5 +- {test => tests}/steps/test_http.py | 0 {test => tests}/utils/test_utils.py | 1 + 135 files changed, 688 insertions(+), 429 deletions(-) create mode 100644 .github/images/logo_koheesio.svg rename {test => tests}/_data/context/common.yml (100%) rename {test => tests}/_data/context/dev.yml (100%) rename {test => tests}/_data/context/sample.json (100%) rename {test => tests}/_data/context/sample.toml (100%) rename {test => tests}/_data/context/sample.yaml (100%) rename {test => tests}/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc (100%) rename {test => tests}/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro (100%) rename {test => tests}/_data/readers/csv_file/dummy.csv (100%) rename {test => tests}/_data/readers/csv_file/dummy_semicolon.csv (100%) rename {test => tests}/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc (100%) rename {test => tests}/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc (100%) rename {test => tests}/_data/readers/delta_file/_delta_log/00000000000000000000.json (100%) rename {test => tests}/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet (100%) rename {test => tests}/_data/readers/json_file/dummy.json (100%) rename {test => tests}/_data/readers/json_file/dummy_simple.json (100%) rename {test => tests}/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc (100%) rename {test => tests}/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc (100%) rename {test => tests}/_data/sql/spark_sql_reader.sql (100%) rename {test => tests}/_data/steps/expected_step_output.yaml (100%) rename {test => tests}/_data/steps/expected_step_output_simple.yaml (100%) rename {test => tests}/_data/transformations/dummy.sql (100%) rename {test => tests}/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl (100%) rename {test => tests}/_data/transformations/spark_expectations_resources/test_product_rules.ddl (100%) rename {test => tests}/_data/transformations/spark_expectations_resources/test_product_rules.sql (100%) rename {test => tests}/_data/transformations/string_data/100000_rows_with_strings.json (100%) rename {test => tests}/conftest.py (100%) rename {test => tests}/core/test_context.py (98%) rename {test => tests}/core/test_init.py (100%) rename {test => tests}/core/test_logger.py (99%) rename {test => tests}/core/test_steps.py (99%) rename {test => tests}/deprecated/nike/ada/__init__.py (100%) rename {test => tests}/models/test_models.py (99%) rename {test => tests}/spark/conftest.py (97%) rename {test => tests}/spark/integrations/box/test_box.py (99%) rename {test => tests}/spark/integrations/dq/test_spark_expectations.py (89%) rename {test => tests}/spark/integrations/snowflake/test_snowflake.py (99%) rename {test => tests}/spark/integrations/snowflake/test_sync_task.py (99%) rename {test => tests}/spark/readers/test_auto_loader.py (97%) rename {test => tests}/spark/readers/test_delta_reader.py (99%) rename {test => tests}/spark/readers/test_file_loader.py (99%) rename {test => tests}/spark/readers/test_hana.py (94%) rename {test => tests}/spark/readers/test_jdbc.py (98%) rename {test => tests}/spark/readers/test_memory.py (98%) rename {test => tests}/spark/readers/test_metastore_reader.py (87%) rename {test => tests}/spark/readers/test_reader.py (89%) rename {test => tests}/spark/readers/test_rest_api.py (99%) rename {test => tests}/spark/readers/test_spark_sql_reader.py (85%) rename {test => tests}/spark/readers/test_teradata.py (94%) rename {test => tests}/spark/tasks/test_etl_task.py (98%) rename {test => tests}/spark/test_delta.py (99%) rename {test => tests}/spark/test_spark.py (90%) rename {test => tests}/spark/test_warnings.py (96%) rename {test => tests}/spark/transformations/date_time/test_date_time.py (99%) rename {test => tests}/spark/transformations/date_time/test_interval.py (99%) rename {test => tests}/spark/transformations/strings/test_change_case.py (99%) rename {test => tests}/spark/transformations/strings/test_concat.py (99%) rename {test => tests}/spark/transformations/strings/test_pad.py (99%) rename {test => tests}/spark/transformations/strings/test_regexp.py (99%) rename {test => tests}/spark/transformations/strings/test_split.py (99%) rename {test => tests}/spark/transformations/strings/test_string_replace.py (98%) rename {test => tests}/spark/transformations/strings/test_substring.py (98%) rename {test => tests}/spark/transformations/strings/test_trim.py (99%) rename {test => tests}/spark/transformations/test_arrays.py (99%) rename {test => tests}/spark/transformations/test_camel_to_snake_transform.py (97%) rename {test => tests}/spark/transformations/test_cast_to_datatype.py (99%) rename {test => tests}/spark/transformations/test_drop_column.py (98%) rename {test => tests}/spark/transformations/test_get_item.py (98%) rename {test => tests}/spark/transformations/test_hash.py (99%) rename {test => tests}/spark/transformations/test_lookup.py (98%) rename {test => tests}/spark/transformations/test_repartition.py (98%) rename {test => tests}/spark/transformations/test_replace.py (99%) rename {test => tests}/spark/transformations/test_row_number_dedup.py (99%) rename {test => tests}/spark/transformations/test_sql_transform.py (99%) rename {test => tests}/spark/transformations/test_transform.py (98%) rename {test => tests}/spark/transformations/test_transformation.py (99%) rename {test => tests}/spark/transformations/test_uuid5.py (98%) rename {test => tests}/spark/writers/delta/test_delta_writer.py (99%) rename {test => tests}/spark/writers/delta/test_scd.py (99%) rename {test => tests}/spark/writers/test_buffer.py (99%) rename {test => tests}/spark/writers/test_dummy.py (94%) rename {test => tests}/spark/writers/test_sftp.py (99%) rename {test => tests}/spark/writers/test_stream.py (98%) rename {test => tests}/steps/asyncio/test_asyncio_http.py (99%) rename {test => tests}/steps/integrations/notifications/test_slack.py (100%) rename {test => tests}/steps/integrations/sso/test_okta.py (97%) rename {test => tests}/steps/test_http.py (100%) rename {test => tests}/utils/test_utils.py (99%) diff --git a/.github/images/logo_koheesio.svg b/.github/images/logo_koheesio.svg new file mode 100644 index 00000000..cdfeab2b --- /dev/null +++ b/.github/images/logo_koheesio.svg @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index ff8c5762..67d0612c 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -27,18 +27,18 @@ jobs: os: [ubuntu-latest] # os: [ubuntu-latest, windows-latest, macos-latest] python-version: ['3.9', '3.10', '3.11', '3.12'] - pyspark-version: ['33', '34', '35'] - exclude: - - python-version: '3.9' - pyspark-version: '35' - - python-version: '3.11' - pyspark-version: '33' - - python-version: '3.11' - pyspark-version: '34' - - python-version: '3.12' - pyspark-version: '33' - - python-version: '3.12' - pyspark-version: '34' + # pyspark-version: ['33', '34', '35'] + # exclude: + # - python-version: '3.9' + # pyspark-version: '35' + # - python-version: '3.11' + # pyspark-version: '33' + # - python-version: '3.11' + # pyspark-version: '34' + # - python-version: '3.12' + # pyspark-version: '33' + # - python-version: '3.12' + # pyspark-version: '34' steps: - uses: actions/checkout@v4 @@ -52,10 +52,11 @@ jobs: run: python -m pip install --upgrade pip - name: Install hatch - run: pip install hatch + run: pip install hatch - name: Run tests - run: hatch run test.py${{ matrix.python-version }}-pyspark${{ matrix.pyspark-version }}:all-tests + # run: hatch run test --python=${{ matrix.python-version }} -i pyspark${{ matrix.pyspark-version }} + run: hatch test --python=${{ matrix.python-version }} # https://github.com/marketplace/actions/alls-green#why final_check: # This job does nothing and is only used for the branch protection diff --git a/.gitignore b/.gitignore index 9c444054..0184ed10 100644 --- a/.gitignore +++ b/.gitignore @@ -126,16 +126,16 @@ spark-warehouse out/** *.iml -/test/integration/**/task_definition/*.yaml +/tests/integration/**/task_definition/*.yaml /.vscode/settings.json /.vscode/launch.json .databricks # PYTEST -/test/data/spark-warehouse -/test/data/checkpoint -/test/data/snowflake_staging.parq/ +/tests/_data/spark-warehouse +/tests/_data/checkpoint +/tests/_data/snowflake_staging.parq/ # DevContainer .devcontainer diff --git a/Makefile b/Makefile index e21d0f14..163a4fdc 100644 --- a/Makefile +++ b/Makefile @@ -105,27 +105,27 @@ coverage: cov all-tests: @echo "\033[1mRunning all tests:\033[0m\n\033[35m This will run the full test suite\033[0m" @echo "\033[1;31mWARNING:\033[0;33m This may take upward of 20-30 minutes to complete!\033[0m" - @hatch run test:all-tests --no-header --no-summary + @hatch test --no-header --no-summary .PHONY: spark-tests ## testing - Run SPARK tests in ALL environments spark-tests: @echo "\033[1mRunning Spark tests:\033[0m\n\033[35m This will run the Spark test suite against all specified environments\033[0m" @echo "\033[1;31mWARNING:\033[0;33m This may take upward of 20-30 minutes to complete!\033[0m" - @hatch run test:spark-tests --no-header --no-summary + @hatch test -m spark --no-header --no-summary .PHONY: non-spark-tests ## testing - Run non-spark tests in ALL environments non-spark-tests: @echo "\033[1mRunning non-Spark tests:\033[0m\n\033[35m This will run the non-Spark test suite against all specified environments\033[0m" - @hatch run test:non-spark-tests --no-header --no-summary -k "not spark" + @hatch test -m "not spark" --no-header --no-summary -.PHONY: pytest ## testing - Run pytest, with all tests in the dev environment -pytest: +.PHONY: dev-test ## testing - Run pytest, with all tests in the dev environment +dev-test: @echo "\033[1mRunning pytest:\033[0m\n\033[35m This will run the full test suite, but only once (in the dev environment)\033[0m" - @hatch run dev:all-tests -vv -.PHONY: pytest-spark ## testing - Run pytest (just spark) in the dev environment -pytest-spark: + @hatch run dev:test -vv +.PHONY: dev-test-spark ## testing - Run pytest (just spark) in the dev environment +dev-test-spark: @echo "\033[1mRunning pytest for Spark tests:\033[0m\n\033[35m This will run the Spark test suite\033[0m" @hatch run dev:spark-tests -vv -.PHONY: pytest-non-spark ## testing - Run pytest without spark in the dev environment -pytest-non-spark: +.PHONY: dev-test-non-spark ## testing - Run pytest without spark in the dev environment +dev-test-non-spark: @echo "\033[1mRunning pytest for non-Spark tests:\033[0m\n\033[35m This will run the test suite, excluding all spark tests\033[0m" @hatch run dev:non-spark-tests -vv diff --git a/README.md b/README.md index 5cea8c16..0e52cd8c 100644 --- a/README.md +++ b/README.md @@ -1,46 +1,29 @@ # Koheesio -![alert_status](https://github.com/Nike-Inc/koheesio/blob/badges/badges/alert_status.svg) -![bugs](https://github.com/Nike-Inc/koheesio/blob/badges/badges/bugs.svg) -![code_smells](https://github.com/Nike-Inc/koheesio/blob/badges/badges/code_smells.svg) -![coverage](https://github.com/Nike-Inc/koheesio/blob/badges/badges/coverage.svg) -![duplicated_lines_density](https://github.com/Nike-Inc/koheesio/blob/badges/badges/duplicated_lines_density.svg) -![ncloc](https://github.com/Nike-Inc/koheesio/blob/badges/badges/ncloc.svg) -![reliability_rating](https://github.com/Nike-Inc/koheesio/blob/badges/badges/reliability_rating.svg) -![security_rating](https://github.com/Nike-Inc/koheesio/blob/badges/badges/security_rating.svg) -![sqale_index](https://github.com/Nike-Inc/koheesio/blob/badges/badges/sqale_index.svg) -![sqale_rating](https://github.com/Nike-Inc/koheesio/blob/badges/badges/sqale_rating.svg) -![vulnerabilities](https://github.com/Nike-Inc/koheesio/blob/badges/badges/vulnerabilities.svg) - - [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) - [![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) - ![python_version](https://img.shields.io/badge/python->=3.9<3.11-orange) -![docstring](https://img.shields.io/badge/docstring-numpy-blue) - - -```pythonregexp - $$\ $$\ $$\ $$\ - $$ |$$ | $$ | \__| - $$ /$$ / $$$$$$\ $$$$$$$\ $$$$$$\ $$$$$$\ $$$$$$$\ $$\ $$$$$$\ - $$ /$$ / $$ __$$\ $$ __$$\ $$ __$$\ $$ __$$\ $$ _____|$$ |$$ __$$\ - $$ / \$$< $$ / $$ |$$ | $$ |$$$$$$$$ |$$$$$$$$ |\$$$$$$\ $$ |$$ / $$ | - $$ / \$$\ $$ | $$ |$$ | $$ |$$ ____|$$ ____| \____$$\ $$ |$$ | $$ | -$$ / \$$\\$$$$$$ |$$ | $$ |\$$$$$$$\ \$$$$$$$\ $$$$$$$ |$$ |\$$$$$$ | -\__/ \__|\______/ \__| \__| \_______| \_______|\_______/ \__| \______/ -``` -> `a framework that can be used to build advanced data-pipelines by simple Python implementations` +
+ +Koheesio logo + +| | | +| --- | --- | +| Meta | [![linting - Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License - Apache-2.0](https://img.shields.io/github/license/Nike-Inc/kohessio)](LICENSE.txt) | + +
+----- + +A framework that can be used to build advanced data-pipelines by simple Python implementations ## Introduction Koheesio is aimed at those building data processing pipelines in Python, particularly useful for large-scale data. -Koheesio, named after the Finnish word for cohesion, is a Python framework for building type-safe and efficient Data -Pipelines. It promotes modularity, composability, and collaboration, allowing for the creation of complex pipelines +Koheesio, named after the Finnish word for cohesion, is a Python framework for building type-safe and efficient Data +Pipelines. It promotes modularity, composability, and collaboration, allowing for the creation of complex pipelines from simple, reusable components. -Koheesio's goal is to ensure predictable pipeline execution through a solid foundation of well-tested code and a rich -set of features, making it an excellent choice for developers and organizations seeking to build robust and adaptable +Koheesio's goal is to ensure predictable pipeline execution through a solid foundation of well-tested code and a rich +set of features, making it an excellent choice for developers and organizations seeking to build robust and adaptable Data Pipelines. - It is designed to be modular and composable, allowing for the creation of complex pipelines from simple components. @@ -100,17 +83,17 @@ Additionally, Koheesio also uses [Pydantic] for strong typing, data validation, Here are the key components included in Koheesio: -* __Step__: This is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, +- __Step__: This is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in inputs and producing outputs. -* __Task__: This is a larger unit of work, typically encompassing an entire Extract - Transform - Load process for a +- __Task__: This is a larger unit of work, typically encompassing an entire Extract - Transform - Load process for a data object. -* __Context__: This is a configuration class used to set up the environment for a Task. It can be used to share +- __Context__: This is a configuration class used to set up the environment for a Task. It can be used to share variables across tasks and adapt the behavior of a Task based on its environment. -* __Logger__: This is a class for logging messages at different levels. -* __Reader__: This is a type of Step that reads data from a source and stores the result (to make it available for +- __Logger__: This is a class for logging messages at different levels. +- __Reader__: This is a type of Step that reads data from a source and stores the result (to make it available for subsequent steps) -* __Writer__: This controls how data is written to the output in both batch and streaming contexts. -* __Transformation__: This is a type of Step that takes a DataFrame as input and returns a DataFrame as output. +- __Writer__: This controls how data is written to the output in both batch and streaming contexts. +- __Transformation__: This is a type of Step that takes a DataFrame as input and returns a DataFrame as output. ```text ┌─────────┐ ┌──────────────────┐ ┌─────────┐ @@ -131,66 +114,70 @@ The interactions between the base concepts of the model is visible in the below Below sections provide some more details on every Core Component ### Step -The Step is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in + +The Step is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in inputs and producing outputs. -For example, a Step could be a process that takes in raw data, cleans it, and outputs the cleaned data. Each Step is +For example, a Step could be a process that takes in raw data, cleans it, and outputs the cleaned data. Each Step is designed to be self-contained and reusable, meaning it can be used in multiple tasks or pipelines without modification. ### Task -A Task in Koheesio is a larger unit of work, typically encompassing an entire Extract - Transform - Load (ETL) process + +A Task in Koheesio is a larger unit of work, typically encompassing an entire Extract - Transform - Load (ETL) process for a data object. -For example, a Task could involve extracting data from a database, transforming the data using several Steps, and then +For example, a Task could involve extracting data from a database, transforming the data using several Steps, and then loading the transformed data into a data warehouse. Tasks are designed to be modular and composable, allowing developers to build complex data pipelines by combining multiple Tasks. Task is a subclass of Step. ### Context -The Context in Koheesio is a configuration class/interface used to set up the environment for a Task. It can be used to + +The Context in Koheesio is a configuration class/interface used to set up the environment for a Task. It can be used to share variables across tasks and adapt the behavior of a Task based on its environment. -For example, a Context could be used to specify the database connection details for a Task, or to specify the output -format for a Writer. The Context is designed to be flexible and adaptable, allowing developers to easily configure their -tasks to suit different environments or requirements. Note that the use of Context is not bound to Tasks; Task is used +For example, a Context could be used to specify the database connection details for a Task, or to specify the output +format for a Writer. The Context is designed to be flexible and adaptable, allowing developers to easily configure their +tasks to suit different environments or requirements. Note that the use of Context is not bound to Tasks; Task is used illustratively in this description. -Context can be created from yaml files, dictionaries, or json files. It also possible to merge multiple Contexts +Context can be created from yaml files, dictionaries, or json files. It also possible to merge multiple Contexts together. Koheesio also has the notion of a 'SecretContext', which can be used to store sensitive information such as passwords or API keys without exposing them in the code. ### Logger -The Logger in Koheesio is a class/interface for logging messages at different levels. It can be used to log debug + +The Logger in Koheesio is a class/interface for logging messages at different levels. It can be used to log debug messages, informational messages, warnings, errors, and critical errors. -For example, a Logger could be used to log the progress of a Task, or to log any errors that occur during the execution -of a Task. The Logger is designed to be easy to use and flexible, allowing developers to easily monitor the progress and +For example, a Logger could be used to log the progress of a Task, or to log any errors that occur during the execution +of a Task. The Logger is designed to be easy to use and flexible, allowing developers to easily monitor the progress and status of their tasks. The API of logger is designed to mimic that of those found in Scala. Click [here](docs/concepts/logging.md) to see logger examples. - ### Reader -The Reader in Koheesio is a type of Step that reads data from a source and stores the result. For example, a Reader -could be used to read data from a CSV file, a database, or an API. The Reader is designed to be flexible and adaptable, + +The Reader in Koheesio is a type of Step that reads data from a source and stores the result. For example, a Reader +could be used to read data from a CSV file, a database, or an API. The Reader is designed to be flexible and adaptable, allowing developers to easily read data from a wide range of sources. Reader is a subclass of Step - ### Writer -The Writer in Koheesio controls how data is written to the output in both batch and streaming contexts. For example, a -Writer could be used to write data to a CSV file, a database, or an API. The Writer is designed to be flexible and + +The Writer in Koheesio controls how data is written to the output in both batch and streaming contexts. For example, a +Writer could be used to write data to a CSV file, a database, or an API. The Writer is designed to be flexible and adaptable, allowing developers to easily write data to a wide range of destinations. Writer is a subclass of Step. - ### Transformation -The Transformation in Koheesio is a type of Step that takes a DataFrame as input and returns a DataFrame as output. For -example, a Transformation could be used to clean data, aggregate data, or perform complex calculations on data. The -Transformation is designed to be flexible and powerful, allowing developers to easily perform a wide range of data + +The Transformation in Koheesio is a type of Step that takes a DataFrame as input and returns a DataFrame as output. For +example, a Transformation could be used to clean data, aggregate data, or perform complex calculations on data. The +Transformation is designed to be flexible and powerful, allowing developers to easily perform a wide range of data transformations. Transformation is a subclass of Step. @@ -199,7 +186,6 @@ Transformation is a subclass of Step. You can install Koheesio using either pip or poetry. - ### Using Pip To install Koheesio using pip, run the following command in your terminal: @@ -208,14 +194,15 @@ To install Koheesio using pip, run the following command in your terminal: pip install koheesio ``` - ### Using Poetry If you're using poetry for package management, you can add Koheesio to your project with the following command: + ```bash poetry add koheesio ``` -or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace + +or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace `...` with the version you want to have installed: ```toml @@ -223,30 +210,27 @@ koheesio = {version = "..."} ``` ### Extras + Koheesio also provides some additional features that can be useful in certain scenarios. These include: -- **Spark Expectations:** Available through the `koheesio.steps.integration.dq` module; installable through the `se` extra. - - SE Provides Data Quality checks for Spark DataFrames. - - For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations). +- __Spark Expectations:__ Available through the `koheesio.steps.integration.dq` module; installable through the `se` extra. + - SE Provides Data Quality checks for Spark DataFrames. + - For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations). [//]: # (- **Brickflow:** Available through the `koheesio.steps.integration.workflow` module; installable through the `bf` extra.) [//]: # ( - Brickflow is a workflow orchestration tool that allows you to define and execute workflows in a declarative way.) [//]: # ( - For more information, refer to the [Brickflow docs](https://engineering.nike.com/brickflow)) -- **Box**: Available through the `koheesio.steps.integration.box` module; installable through the `box` extra. - - Box is a cloud content management and file sharing service for businesses. +- __Box__: Available through the `koheesio.steps.integration.box` module; installable through the `box` extra. + - Box is a cloud content management and file sharing service for businesses. -- **Cerberus**: Available through the `koheesio.steps.integration.secrets` module; installable through the `cerberus` extra. - - Cerberus is a tool for managing secrets in a secure and scalable way. - - For more information, refer to the [Cerberus docs](https://engineering.nike.com/cerberus) - -> **Note:** +> __Note:__ > Some of the steps require extra dependencies. See the [Extras](#extras) section for additional info. > Extras can be added to Poetry by adding `extras=['name_of_the_extra']` to the toml entry mentioned above ## Getting Started -In this section, we will provide an example of how you can use Koheesio. This example uses all the Core Concepts +In this section, we will provide an example of how you can use Koheesio. This example uses all the Core Concepts (except an explicit logger) to show how it can be combined end-to-end. Consider this code snippet: @@ -336,7 +320,7 @@ Based on the `DummyReader()`, we start with an input DataFrame that looks like t The transform method we specified in our custom Task adds a new column called "MyFavoriteMovie" to the input DataFrame, filled with the value of `my_favorite_movie` that we provided while initializing the class. -Given that the `CamelToSnakeTransformation` changes the column names into snake case, the output DataFrame from `task.run()` will have two columns: 'id' and 'my_favorite_movie'. The 'id' column will contain 100 rows of data (as provided by DummyReader), and the 'my_favorite_movie' column will contain the same value "inception" for all rows. +Given that the `CamelToSnakeTransformation` changes the column names into snake case, the output DataFrame from `task.run()` will have two columns: 'id' and 'my_favorite_movie'. The 'id' column will contain 100 rows of data (as provided by DummyReader), and the 'my_favorite_movie' column will contain the same value "inception" for all rows. The DummyWriter then performs a df.show() on the DataFrame and returns the first row of data as a dict. So, the final output of task.run() would be a dictionary representing the first row of the DataFrame, something like this: @@ -355,53 +339,58 @@ The data in this DataFrame looks like this: | 100 | inception | ## Custom Implementations -In Koheesio, custom implementations can replace the provided implementations as long as they match the API. Here are + +In Koheesio, custom implementations can replace the provided implementations as long as they match the API. Here are some examples: -1. __Custom Step__: You can create a custom Step that performs a specific operation not covered by the built-in Steps. +1. __Custom Step__: You can create a custom Step that performs a specific operation not covered by the built-in Steps. For example, you might create a Step that performs a specific type of data validation or a complex transformation. + ```python class CustomStep(Step): def execute(self): # Implement your custom logic here ``` -2. __Custom Reader__: You can create a custom Reader that reads data from a specific source not covered by the built-in +2. __Custom Reader__: You can create a custom Reader that reads data from a specific source not covered by the built-in Readers. For example, you might create a Reader that reads data from a proprietary database or a specific API. + ```python class CustomReader(Reader): def execute(self): # Implement your custom logic here ``` -3. __Custom Writer__: You can create a custom Writer that writes data to a specific destination not covered by the - built-in Writers. For example, you might create a Writer that writes data to a proprietary database or a specific +3. __Custom Writer__: You can create a custom Writer that writes data to a specific destination not covered by the + built-in Writers. For example, you might create a Writer that writes data to a proprietary database or a specific API. + ```python class CustomWriter(Writer): def execute(self): # Implement your custom logic here ``` -4. __Custom Transformation__: You can create a custom Transformation that performs a specific transformation not - covered by the built-in Transformations. For example, you might create a Transformation that performs a complex +4. __Custom Transformation__: You can create a custom Transformation that performs a specific transformation not + covered by the built-in Transformations. For example, you might create a Transformation that performs a complex calculation or a specific type of data cleaning. + ```python class CustomTransformation(Transformation): def execute(self): # Implement your custom logic here ``` -Note that in all the above examples, the `execute` method is the only method that needs to be implemented. This is +Note that in all the above examples, the `execute` method is the only method that needs to be implemented. This is per design. -Remember, the key to creating custom implementations in Koheesio is to ensure they match the API of the component -they're replacing. This ensures they can be used interchangeably with the built-in components. Check out the API +Remember, the key to creating custom implementations in Koheesio is to ensure they match the API of the component +they're replacing. This ensures they can be used interchangeably with the built-in components. Check out the API Documentation for more details on the many different components available in Koheesio. -Before creating custom implementations, it's a good idea to check if there's already a built-in component that can be -used instead. This will save you time and effort in the long run. When you do create custom implementations, read -through the API documentation belonging to the Step and StepOutput modules. +Before creating custom implementations, it's a good idea to check if there's already a built-in component that can be +used instead. This will save you time and effort in the long run. When you do create custom implementations, read +through the API documentation belonging to the Step and StepOutput modules. ## Contributing @@ -409,21 +398,21 @@ through the API documentation belonging to the Step and StepOutput modules. We welcome contributions to our project! Here's a brief overview of our development process: -* __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes - these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull +- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes + these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull request. -* __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting +- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting a pull request. -* __Release Process__: We aim for frequent releases. Typically when we have a new feature or bugfix, a developer with +- __Release Process__: We aim for frequent releases. Typically when we have a new feature or bugfix, a developer with admin rights will create a new release on GitHub and publish the new version to PyPI. -For more detailed information, please refer to our [contribution guidelines](./docs/contribute.md). We also adhere to +For more detailed information, please refer to our [contribution guidelines](./docs/contribute.md). We also adhere to [Nike's Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md) and [Nike's Individual Contributor License Agreement](https://www.clahub.com/agreements/Nike-Inc/fastbreak). ### Additional Resources -* [General GitHub documentation](https://help.github.com/) -* [GitHub pull request documentation](https://help.github.com/send-pull-requests/) -* [Nike OSS](https://nike-inc.github.io/) +- [General GitHub documentation](https://help.github.com/) +- [GitHub pull request documentation](https://help.github.com/send-pull-requests/) +- [Nike OSS](https://nike-inc.github.io/) diff --git a/koheesio/__about__.py b/koheesio/__about__.py index 586ac232..85772490 100644 --- a/koheesio/__about__.py +++ b/koheesio/__about__.py @@ -12,13 +12,16 @@ LICENSE_INFO = "Licensed as Apache 2.0" SOURCE = "https://github.com/Nike-Inc/koheesio" -__version__ = "0.7.0.dev0" -__logo__ = 75, ( - b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15" - b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee" - b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc" - b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e" - b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00" +__version__ = "0.7.0.dev1" +__logo__ = ( + 75, + ( + b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15" + b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee" + b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc" + b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e" + b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00" + ), ) __short_description__ = __doc__.split("\n", maxsplit=1)[0] __about__ = f"""Koheesio -v{__version__} diff --git a/koheesio/__init__.py b/koheesio/__init__.py index ac44970f..e25d424e 100644 --- a/koheesio/__init__.py +++ b/koheesio/__init__.py @@ -1,8 +1,13 @@ # pragma: no cover + +import os + from koheesio.__about__ import __version__, _about from koheesio.models import BaseModel, ExtraParamsMixin from koheesio.steps import Step, StepOutput +from koheesio.utils import convert_str_to_bool +_koheesio_print_logo = convert_str_to_bool(os.environ.get("KOHEESIO__PRINT_LOGO", "True")) _logo_printed = False ABOUT = _about() VERSION = __version__ @@ -12,7 +17,9 @@ def print_logo(): global _logo_printed - if not _logo_printed: + global _koheesio_print_logo + + if not _logo_printed and _koheesio_print_logo: print(ABOUT) _logo_printed = True diff --git a/koheesio/context.py b/koheesio/context.py index efc01e03..0f4b69e3 100644 --- a/koheesio/context.py +++ b/koheesio/context.py @@ -14,9 +14,9 @@ from __future__ import annotations import re +from typing import Any, Dict, Union from collections.abc import Mapping from pathlib import Path -from typing import Any, Dict, Union import jsonpickle import tomli diff --git a/koheesio/logger.py b/koheesio/logger.py index 8035db60..0ddfe03c 100644 --- a/koheesio/logger.py +++ b/koheesio/logger.py @@ -33,8 +33,8 @@ import logging import os import sys -from logging import Formatter, Logger, getLogger from typing import Any, Dict, Generator, Generic, List, Optional, Tuple, TypeVar +from logging import Formatter, Logger, getLogger from uuid import uuid4 from warnings import warn diff --git a/koheesio/models/__init__.py b/koheesio/models/__init__.py index 525286f0..b405f65a 100644 --- a/koheesio/models/__init__.py +++ b/koheesio/models/__init__.py @@ -1,4 +1,4 @@ -""" Models package creates models that can be used to base other classes on. +"""Models package creates models that can be used to base other classes on. - Every model should be at least a pydantic BaseModel, but can also be a Step, or a StepOutput. - Every model is expected to be an ABC (Abstract Base Class) @@ -9,26 +9,21 @@ Transformation and Reader classes. """ -# pylint: disable=redefined-builtin +from typing import Annotated, Any, Dict, List, Optional, Union from abc import ABC from functools import cached_property from pathlib import Path -from typing import Annotated, Any, Dict, List, Optional, Union # to ensure that koheesio.models is a drop in replacement for pydantic -from pydantic import * # noqa from pydantic import BaseModel as PydanticBaseModel - -# noinspection PyUnresolvedReferences, PyProtectedMember +from pydantic import * # noqa from pydantic._internal._generics import PydanticGenericMetadata - -# noinspection PyUnresolvedReferences, PyProtectedMember from pydantic._internal._model_construction import ModelMetaclass from koheesio.context import Context from koheesio.logger import Logger, LoggingFactory -# pylint: enable=redefined-builtin +__all__ = ["BaseModel", "ExtraParamsMixin", "ListOfColumns", "ModelMetaclass", "PydanticGenericMetadata"] # pylint: disable=function-redefined @@ -126,10 +121,12 @@ class BaseModel(PydanticBaseModel, ABC): ```python from koheesio.models import BaseModel + class Person(BaseModel): name: str age: int + # Using the lazy method to create an instance without immediate validation person = Person.lazy() @@ -257,6 +254,7 @@ def from_context(cls, context: Context) -> BaseModel: class SomeStep(BaseModel): foo: str + context = Context(foo="bar") some_step = SomeStep.from_context(context) print(some_step.foo) # prints 'bar' @@ -605,6 +603,7 @@ class FooModel(BaseModel): foo: str lorem: str + foo_model = FooModel.lazy() foo_model.foo = "bar" foo_model.lorem = "ipsum" diff --git a/koheesio/models/sql.py b/koheesio/models/sql.py index 37207a76..eacb100e 100644 --- a/koheesio/models/sql.py +++ b/koheesio/models/sql.py @@ -1,8 +1,8 @@ """This module contains the base class for SQL steps.""" +from typing import Any, Dict, Optional, Union from abc import ABC from pathlib import Path -from typing import Any, Dict, Optional, Union from koheesio.models import ExtraParamsMixin, Field, model_validator from koheesio.steps import Step diff --git a/koheesio/steps/asyncio/__init__.py b/koheesio/steps/asyncio/__init__.py index b501c1de..4efdc38c 100644 --- a/koheesio/steps/asyncio/__init__.py +++ b/koheesio/steps/asyncio/__init__.py @@ -2,8 +2,8 @@ This module provides classes for asynchronous steps in the koheesio package. """ -from asyncio import iscoroutine from typing import Dict, Union +from asyncio import iscoroutine from koheesio.steps.step import Step, StepMetaClass, StepOutput @@ -64,9 +64,7 @@ def merge(self, other: Union[Dict, StepOutput]): For example: ```python step_output = StepOutput(foo="bar") - step_output.merge( - {"lorem": "ipsum"} - ) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'} + step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'} ``` Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`. diff --git a/koheesio/steps/asyncio/http.py b/koheesio/steps/asyncio/http.py index 015877fc..640e1cd7 100644 --- a/koheesio/steps/asyncio/http.py +++ b/koheesio/steps/asyncio/http.py @@ -12,6 +12,7 @@ import yarl from aiohttp import BaseConnector, ClientSession, TCPConnector from aiohttp_retry import ExponentialRetry, RetryClient, RetryOptionsBase + from pydantic import Field, SecretStr, field_validator, model_validator from koheesio.models import ExtraParamsMixin diff --git a/koheesio/steps/delta.py b/koheesio/steps/delta.py index 528739ff..fb6edf04 100644 --- a/koheesio/steps/delta.py +++ b/koheesio/steps/delta.py @@ -6,6 +6,7 @@ from typing import Dict, List, Optional, Union from py4j.protocol import Py4JJavaError # type: ignore + from pyspark.sql import DataFrame from pyspark.sql.types import DataType diff --git a/koheesio/steps/http.py b/koheesio/steps/http.py index 16d4bf6e..3ffe8c48 100644 --- a/koheesio/steps/http.py +++ b/koheesio/steps/http.py @@ -13,8 +13,8 @@ """ import json -from enum import Enum from typing import Any, Dict, List, Optional, Union +from enum import Enum import requests diff --git a/koheesio/steps/integrations/box.py b/koheesio/steps/integrations/box.py index bb52a6ec..fa29c482 100644 --- a/koheesio/steps/integrations/box.py +++ b/koheesio/steps/integrations/box.py @@ -11,15 +11,16 @@ """ import re +from typing import Any, Dict, Optional, Union from abc import ABC, abstractmethod from datetime import datetime from io import BytesIO from pathlib import PurePath -from typing import Any, Dict, Optional, Union from boxsdk import Client, JWTAuth from boxsdk.object.file import File from boxsdk.object.folder import Folder + from pyspark.sql import DataFrame from pyspark.sql.functions import expr, lit from pyspark.sql.types import StructType @@ -609,16 +610,12 @@ class BoxFileWriter(BoxFolderBase): from koheesio.steps.integrations.box import BoxFileWriter auth_params = {...} - f1 = BoxFileWriter( - **auth_params, path="/foo/bar", file="path/to/my/file.ext" - ).execute() + f1 = BoxFileWriter(**auth_params, path="/foo/bar", file="path/to/my/file.ext").execute() # or import io b = io.BytesIO(b"my-sample-data") - f2 = BoxFileWriter( - **auth_params, path="/foo/bar", file=b, name="file.ext" - ).execute() + f2 = BoxFileWriter(**auth_params, path="/foo/bar", file=b, name="file.ext").execute() ``` """ diff --git a/koheesio/steps/integrations/dq/spark_expectations.py b/koheesio/steps/integrations/dq/spark_expectations.py index 8724db43..687ccdc2 100644 --- a/koheesio/steps/integrations/dq/spark_expectations.py +++ b/koheesio/steps/integrations/dq/spark_expectations.py @@ -4,14 +4,16 @@ from typing import Any, Dict, Optional, Union -from pydantic import Field -from pyspark.sql import DataFrame from spark_expectations.config.user_config import Constants as user_config from spark_expectations.core.expectations import ( SparkExpectations, WrappedDataFrameWriter, ) +from pydantic import Field + +from pyspark.sql import DataFrame + from koheesio.steps.transformations import Transformation from koheesio.steps.writers import BatchOutputMode diff --git a/koheesio/steps/integrations/notifications/slack.py b/koheesio/steps/integrations/notifications/slack.py index 8d1b9278..e0689cb7 100644 --- a/koheesio/steps/integrations/notifications/slack.py +++ b/koheesio/steps/integrations/notifications/slack.py @@ -3,9 +3,9 @@ """ import json +from typing import Any, Dict, Optional from datetime import datetime from textwrap import dedent -from typing import Any, Dict, Optional from koheesio.models import ConfigDict, Field from koheesio.steps.http import HttpPostStep diff --git a/koheesio/steps/integrations/secrets/__init__.py b/koheesio/steps/integrations/secrets/__init__.py index 38047f42..0740aeaa 100644 --- a/koheesio/steps/integrations/secrets/__init__.py +++ b/koheesio/steps/integrations/secrets/__init__.py @@ -3,8 +3,8 @@ Contains abstract class for various secret integrations also known as SecretContext. """ -from abc import ABC, abstractmethod from typing import Optional +from abc import ABC, abstractmethod from koheesio.context import Context from koheesio.models import Field, SecretStr diff --git a/koheesio/steps/integrations/snowflake.py b/koheesio/steps/integrations/snowflake.py index f143d971..88ac8ce4 100644 --- a/koheesio/steps/integrations/snowflake.py +++ b/koheesio/steps/integrations/snowflake.py @@ -41,10 +41,10 @@ """ import json +from typing import Any, Dict, List, Optional, Set, Union from abc import ABC from copy import deepcopy from textwrap import dedent -from typing import Any, Dict, List, Optional, Set, Union from pyspark.sql import DataFrame, Window from pyspark.sql import functions as f diff --git a/koheesio/steps/integrations/sso/okta.py b/koheesio/steps/integrations/sso/okta.py index 5c06246c..4a0e840e 100644 --- a/koheesio/steps/integrations/sso/okta.py +++ b/koheesio/steps/integrations/sso/okta.py @@ -4,8 +4,8 @@ from __future__ import annotations -from logging import Filter from typing import Dict, Optional, Union +from logging import Filter from requests import HTTPError diff --git a/koheesio/steps/readers/__init__.py b/koheesio/steps/readers/__init__.py index 905912b9..a2ddd816 100644 --- a/koheesio/steps/readers/__init__.py +++ b/koheesio/steps/readers/__init__.py @@ -6,8 +6,8 @@ [reference/concepts/steps/readers](../../../reference/concepts/readers.md) section of the Koheesio documentation. """ -from abc import ABC, abstractmethod from typing import Optional +from abc import ABC, abstractmethod from pyspark.sql import DataFrame diff --git a/koheesio/steps/readers/databricks/autoloader.py b/koheesio/steps/readers/databricks/autoloader.py index 9a63d728..9922c8ae 100644 --- a/koheesio/steps/readers/databricks/autoloader.py +++ b/koheesio/steps/readers/databricks/autoloader.py @@ -3,8 +3,8 @@ Autoloader can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats. """ -from enum import Enum from typing import Dict, Optional, Union +from enum import Enum from koheesio.models import Field, field_validator from koheesio.steps.readers import Reader diff --git a/koheesio/steps/readers/file_loader.py b/koheesio/steps/readers/file_loader.py index f3bc51c2..1ed0f7f7 100644 --- a/koheesio/steps/readers/file_loader.py +++ b/koheesio/steps/readers/file_loader.py @@ -30,9 +30,9 @@ [official documentation](https://spark.apache.org/docs/latest/sql-data-sources.html). """ +from typing import Optional, Union from enum import Enum from pathlib import Path -from typing import Optional, Union from pyspark.sql.types import StructType @@ -80,9 +80,7 @@ class FileLoader(Reader, ExtraParamsMixin): Example: ```python - reader = FileLoader( - path="path/to/textfile.txt", format="text", header=True, lineSep="\n" - ) + reader = FileLoader(path="path/to/textfile.txt", format="text", header=True, lineSep="\n") ``` For more information about the available options, see Spark's diff --git a/koheesio/steps/readers/memory.py b/koheesio/steps/readers/memory.py index f667ee85..ed7e5e5e 100644 --- a/koheesio/steps/readers/memory.py +++ b/koheesio/steps/readers/memory.py @@ -3,9 +3,9 @@ """ import json +from typing import Any, Dict, Optional, Union from enum import Enum from functools import partial -from typing import Any, Dict, Optional, Union from pyspark.rdd import RDD from pyspark.sql import DataFrame diff --git a/koheesio/steps/readers/rest_api.py b/koheesio/steps/readers/rest_api.py index 2d0286f0..a71c1907 100644 --- a/koheesio/steps/readers/rest_api.py +++ b/koheesio/steps/readers/rest_api.py @@ -12,6 +12,7 @@ from typing import List, Tuple, Union from pydantic import Field, InstanceOf + from pyspark.sql.types import AtomicType, StructType from koheesio.steps.asyncio.http import AsyncHttpGetStep @@ -66,9 +67,7 @@ class RestApiReader(Reader): pages=3, session=session, ) - task = RestApiReader( - transport=transport, spark_schema="id: int, page:int, value: string" - ) + task = RestApiReader(transport=transport, spark_schema="id: int, page:int, value: string") task.execute() all_data = [row.asDict() for row in task.output.df.collect()] ``` @@ -93,9 +92,7 @@ class RestApiReader(Reader): connector=connector, ) - task = RestApiReader( - transport=transport, spark_schema="id: int, page:int, value: string" - ) + task = RestApiReader(transport=transport, spark_schema="id: int, page:int, value: string") task.execute() all_data = [row.asDict() for row in task.output.df.collect()] ``` diff --git a/koheesio/steps/spark.py b/koheesio/steps/spark.py index a21dd27c..daa8e334 100644 --- a/koheesio/steps/spark.py +++ b/koheesio/steps/spark.py @@ -4,10 +4,11 @@ from __future__ import annotations -from abc import ABC from typing import Optional +from abc import ABC from pydantic import Field + from pyspark.sql import Column from pyspark.sql import DataFrame as PySparkSQLDataFrame from pyspark.sql import SparkSession as OriginalSparkSession diff --git a/koheesio/steps/step.py b/koheesio/steps/step.py index 062ea4f3..ee1e8e15 100644 --- a/koheesio/steps/step.py +++ b/koheesio/steps/step.py @@ -8,9 +8,9 @@ import json import sys import warnings +from typing import Any from abc import abstractmethod from functools import partialmethod, wraps -from typing import Any import yaml @@ -51,6 +51,14 @@ class StepMetaClass(ModelMetaclass): allowing for the execute method to be auto-decorated with do_execute """ + # Solution to overcome issue with python>=3.11, + # When partialmethod is forgetting that _execute_wrapper + # is a method of wrapper, and it needs to pass that in as the first arg. + # https://github.com/python/cpython/issues/99152 + class _partialmethod_with_self(partialmethod): + def __get__(self, obj, cls=None): + return self._make_unbound_method().__get__(obj, cls) + # Unique object to mark a function as wrapped _step_execute_wrapper_sentinel = object() diff --git a/koheesio/steps/transformations/__init__.py b/koheesio/steps/transformations/__init__.py index d99455d1..44e6635e 100644 --- a/koheesio/steps/transformations/__init__.py +++ b/koheesio/steps/transformations/__init__.py @@ -21,8 +21,8 @@ Extended ColumnsTransformation class with an additional `target_column` field """ -from abc import ABC, abstractmethod from typing import List, Optional, Union +from abc import ABC, abstractmethod from pyspark.sql import Column from pyspark.sql import functions as f @@ -58,9 +58,7 @@ class Transformation(SparkStep, ABC): class AddOne(Transformation): def execute(self): - self.output.df = self.df.withColumn( - "new_column", f.col("old_column") + 1 - ) + self.output.df = self.df.withColumn("new_column", f.col("old_column") + 1) ``` In the example above, the `execute` method is implemented to add 1 to the values of the `old_column` and store the diff --git a/koheesio/steps/transformations/arrays.py b/koheesio/steps/transformations/arrays.py index d97ef73b..bdc8124f 100644 --- a/koheesio/steps/transformations/arrays.py +++ b/koheesio/steps/transformations/arrays.py @@ -23,9 +23,9 @@ Base class for all transformations that operate on columns and have a target column. """ +from typing import Any from abc import ABC from functools import reduce -from typing import Any from pyspark.sql import Column from pyspark.sql import functions as F diff --git a/koheesio/steps/transformations/camel_to_snake.py b/koheesio/steps/transformations/camel_to_snake.py index 1cc9413d..70aebbca 100644 --- a/koheesio/steps/transformations/camel_to_snake.py +++ b/koheesio/steps/transformations/camel_to_snake.py @@ -48,9 +48,7 @@ class CamelToSnakeTransformation(ColumnsTransformation): | ... | ... | ```python - output_df = CamelToSnakeTransformation(column="camelCaseColumn").transform( - input_df - ) + output_df = CamelToSnakeTransformation(column="camelCaseColumn").transform(input_df) ``` __output_df:__ diff --git a/koheesio/steps/transformations/date_time/__init__.py b/koheesio/steps/transformations/date_time/__init__.py index 5e57f99c..1026d175 100644 --- a/koheesio/steps/transformations/date_time/__init__.py +++ b/koheesio/steps/transformations/date_time/__init__.py @@ -2,6 +2,8 @@ from typing import Optional, Union +from pytz import all_timezones_set + from pyspark.sql import Column from pyspark.sql import functions as f from pyspark.sql.functions import ( @@ -13,7 +15,6 @@ to_utc_timestamp, when, ) -from pytz import all_timezones_set from koheesio.models import Field, field_validator, model_validator from koheesio.steps.transformations import ColumnsTransformationWithTarget diff --git a/koheesio/steps/transformations/date_time/interval.py b/koheesio/steps/transformations/date_time/interval.py index 84dccfe0..1f872bcb 100644 --- a/koheesio/steps/transformations/date_time/interval.py +++ b/koheesio/steps/transformations/date_time/interval.py @@ -99,14 +99,10 @@ DateTimeAddInterval, ) -input_df = spark.createDataFrame( - [(1, "2022-01-01 00:00:00")], ["id", "my_column"] -) +input_df = spark.createDataFrame([(1, "2022-01-01 00:00:00")], ["id", "my_column"]) # add 1 day to my_column and store the result in a new column called 'one_day_later' -output_df = DateTimeAddInterval( - column="my_column", target_column="one_day_later", interval="1 day" -).transform(input_df) +output_df = DateTimeAddInterval(column="my_column", target_column="one_day_later", interval="1 day").transform(input_df) ``` __output_df__: diff --git a/koheesio/steps/transformations/lookup.py b/koheesio/steps/transformations/lookup.py index 34c11b22..ac8c926b 100644 --- a/koheesio/steps/transformations/lookup.py +++ b/koheesio/steps/transformations/lookup.py @@ -9,8 +9,8 @@ DataframeLookup """ -from enum import Enum from typing import List, Optional, Union +from enum import Enum import pyspark.sql.functions as f from pyspark.sql import Column, DataFrame @@ -102,9 +102,7 @@ class DataframeLookup(Transformation): df=left_df, other=right_df, on=JoinMapping(source_column="id", joined_column="id"), - targets=TargetColumn( - target_column="value", target_column_alias="right_value" - ), + targets=TargetColumn(target_column="value", target_column_alias="right_value"), how=JoinType.LEFT, ) diff --git a/koheesio/steps/transformations/strings/change_case.py b/koheesio/steps/transformations/strings/change_case.py index 26c1b743..62bbb003 100644 --- a/koheesio/steps/transformations/strings/change_case.py +++ b/koheesio/steps/transformations/strings/change_case.py @@ -51,9 +51,7 @@ class LowerCase(ColumnsTransformationWithTarget): | Beans| 1600| USA| ```python - output_df = LowerCase( - column="product", target_column="product_lower" - ).transform(df) + output_df = LowerCase(column="product", target_column="product_lower").transform(df) ``` __output_df:__ @@ -109,9 +107,7 @@ class UpperCase(LowerCase): | Beans| 1600| USA| ```python - output_df = UpperCase( - column="product", target_column="product_upper" - ).transform(df) + output_df = UpperCase(column="product", target_column="product_upper").transform(df) ``` __output_df:__ @@ -162,9 +158,7 @@ class TitleCase(LowerCase): | Beans| 1600| USA| ```python - output_df = TitleCase( - column="product", target_column="product_title" - ).transform(df) + output_df = TitleCase(column="product", target_column="product_title").transform(df) ``` __output_df:__ diff --git a/koheesio/steps/transformations/strings/split.py b/koheesio/steps/transformations/strings/split.py index a6d80ab6..68b3e96a 100644 --- a/koheesio/steps/transformations/strings/split.py +++ b/koheesio/steps/transformations/strings/split.py @@ -51,9 +51,7 @@ class SplitAll(ColumnsTransformationWithTarget): | Beans| 1600| USA| ```python - output_df = SplitColumn( - column="product", target_column="split", split_pattern=" " - ).transform(input_df) + output_df = SplitColumn(column="product", target_column="split", split_pattern=" ").transform(input_df) ``` __output_df:__ @@ -109,9 +107,7 @@ class SplitAtFirstMatch(SplitAll): | Beans| 1600| USA| ```python - output_df = SplitColumn( - column="product", target_column="split_first", split_pattern="an" - ).transform(input_df) + output_df = SplitColumn(column="product", target_column="split_first", split_pattern="an").transform(input_df) ``` __output_df:__ diff --git a/koheesio/steps/transformations/strings/trim.py b/koheesio/steps/transformations/strings/trim.py index d77ebd0d..db0e3117 100644 --- a/koheesio/steps/transformations/strings/trim.py +++ b/koheesio/steps/transformations/strings/trim.py @@ -57,9 +57,7 @@ class Trim(ColumnsTransformationWithTarget): ### Trim whitespace from the beginning of a string ```python - output_df = Trim( - column="column", target_column="trimmed_column", direction="left" - ).transform(input_df) + output_df = Trim(column="column", target_column="trimmed_column", direction="left").transform(input_df) ``` __output_df:__ @@ -86,9 +84,7 @@ class Trim(ColumnsTransformationWithTarget): ### Trim whitespace from the end of a string ```python - output_df = Trim( - column="column", target_column="trimmed_column", direction="right" - ).transform(input_df) + output_df = Trim(column="column", target_column="trimmed_column", direction="right").transform(input_df) ``` __output_df:__ diff --git a/koheesio/steps/transformations/transform.py b/koheesio/steps/transformations/transform.py index f97ab70e..07771c7c 100644 --- a/koheesio/steps/transformations/transform.py +++ b/koheesio/steps/transformations/transform.py @@ -6,8 +6,8 @@ from __future__ import annotations -from functools import partial from typing import Callable, Dict +from functools import partial from pyspark.sql import DataFrame diff --git a/koheesio/steps/transformations/uuid5.py b/koheesio/steps/transformations/uuid5.py index 79e62e90..3e2df23a 100644 --- a/koheesio/steps/transformations/uuid5.py +++ b/koheesio/steps/transformations/uuid5.py @@ -110,9 +110,7 @@ class HashUUID5(Transformation): In code: ```python - HashUUID5(source_columns=["id", "string"], target_column="uuid5").transform( - input_df - ) + HashUUID5(source_columns=["id", "string"], target_column="uuid5").transform(input_df) ``` In this example, the `id` and `string` columns are concatenated and hashed using the UUID5 algorithm. The result is diff --git a/koheesio/steps/writers/__init__.py b/koheesio/steps/writers/__init__.py index bb29e537..87e2a30a 100644 --- a/koheesio/steps/writers/__init__.py +++ b/koheesio/steps/writers/__init__.py @@ -1,8 +1,8 @@ """The Writer class is used to write the DataFrame to a target.""" +from typing import Optional from abc import ABC, abstractmethod from enum import Enum -from typing import Optional from pyspark.sql import DataFrame diff --git a/koheesio/steps/writers/buffer.py b/koheesio/steps/writers/buffer.py index 065ced94..e9128719 100644 --- a/koheesio/steps/writers/buffer.py +++ b/koheesio/steps/writers/buffer.py @@ -14,14 +14,16 @@ """ import gzip +from typing import Literal, Optional from abc import ABC from functools import partial from os import linesep from tempfile import SpooledTemporaryFile -from typing import Literal, Optional from pandas._typing import CompressionOptions as PandasCompressionOptions + from pydantic import InstanceOf + from pyspark import pandas from koheesio.models import ExtraParamsMixin, Field, constr @@ -129,7 +131,7 @@ class PandasCsvBufferWriter(BufferWriter, ExtraParamsMixin): |------------------|-----------------|-----------------|----------------|-------------------|------------------|-------| | maxRecordsPerFile| ... | chunksize | None | max_records_per_file | ... | Spark property name: spark.sql.files.maxRecordsPerFile | | sep | , | sep | , | sep | , | | - | lineSep | `\\n ` | line_terminator | os.linesep | lineSep (alias=line_terminator) | \\n | | + | lineSep | `\\n ` | line_terminator | os.linesep | lineSep (alias=line_terminator) | \\n | | | N/A | ... | index | True | index | False | Determines whether row labels (index) are included in the output | | header | False | header | True | header | True | | | quote | " | quotechar | " | quote (alias=quotechar) | " | | @@ -258,8 +260,19 @@ class Output(BufferWriter.Output): pandas_df: Optional[pandas.DataFrame] = Field(None, description="The Pandas DataFrame that was written") - def get_options(self): + def get_options(self, options_type: str = "csv"): """Returns the options to pass to Pandas' to_csv() method.""" + try: + import pandas as _pd + + # Get the pandas version as a tuple of integers + pandas_version = tuple(int(i) for i in _pd.__version__.split(".")) + except ImportError: + raise ImportError("Pandas is required to use this writer") + + # Use line_separator for pandas 2.0.0 and later + line_sep_option_naming = "line_separator" if pandas_version >= (2, 0, 0) else "line_terminator" + csv_options = { "header": self.header, "sep": self.sep, @@ -267,12 +280,18 @@ def get_options(self): "doublequote": self.quoteAll, "escapechar": self.escape, "na_rep": self.emptyValue or self.nullValue, - "line_terminator": self.lineSep, + line_sep_option_naming: self.lineSep, "index": self.index, "date_format": self.timestampFormat, "compression": self.compression, **self.params, } + + if options_type == "spark": + csv_options["lineterminator"] = csv_options.pop(line_sep_option_naming) + elif options_type == "kohesio_pandas_buffer_writer": + csv_options["line_terminator"] = csv_options.pop(line_sep_option_naming) + return csv_options def execute(self): @@ -284,7 +303,7 @@ def execute(self): # create csv file in memory file_buffer = self.output.buffer - self.output.pandas_df.to_csv(file_buffer, **self.get_options()) + self.output.pandas_df.to_csv(file_buffer, **self.get_options(options_type="spark")) # pylint: disable=C0301 diff --git a/koheesio/steps/writers/delta/batch.py b/koheesio/steps/writers/delta/batch.py index 01cce76a..5a089b26 100644 --- a/koheesio/steps/writers/delta/batch.py +++ b/koheesio/steps/writers/delta/batch.py @@ -34,11 +34,12 @@ ``` """ -from functools import partial from typing import List, Optional, Set, Type, Union +from functools import partial from delta.tables import DeltaMergeBuilder, DeltaTable from py4j.protocol import Py4JError + from pyspark.sql import DataFrameWriter from koheesio.models import ExtraParamsMixin, Field, field_validator diff --git a/koheesio/steps/writers/delta/scd.py b/koheesio/steps/writers/delta/scd.py index 3ded7901..2c96a24f 100644 --- a/koheesio/steps/writers/delta/scd.py +++ b/koheesio/steps/writers/delta/scd.py @@ -15,11 +15,13 @@ """ -from logging import Logger from typing import List, Optional +from logging import Logger from delta.tables import DeltaMergeBuilder, DeltaTable + from pydantic import InstanceOf + from pyspark.sql import Column from pyspark.sql import functions as F from pyspark.sql.types import DateType, TimestampType diff --git a/koheesio/steps/writers/delta/stream.py b/koheesio/steps/writers/delta/stream.py index f33dc555..d126f050 100644 --- a/koheesio/steps/writers/delta/stream.py +++ b/koheesio/steps/writers/delta/stream.py @@ -2,8 +2,8 @@ This module defines the DeltaTableStreamWriter class, which is used to write streaming dataframes to Delta tables. """ -from email.policy import default from typing import Optional +from email.policy import default from pydantic import Field diff --git a/koheesio/steps/writers/sftp.py b/koheesio/steps/writers/sftp.py index 18773444..5ddcf237 100644 --- a/koheesio/steps/writers/sftp.py +++ b/koheesio/steps/writers/sftp.py @@ -14,9 +14,9 @@ import hashlib import time +from typing import Optional, Union from enum import Enum from pathlib import Path -from typing import Optional, Union from paramiko.sftp_client import SFTPClient from paramiko.transport import Transport @@ -383,7 +383,7 @@ class SendCsvToSftp(PandasCsvBufferWriter, SFTPWriter): @model_validator(mode="after") def set_up_buffer_writer(self) -> "SendCsvToSftp": """Set up the buffer writer, passing all CSV related options to it.""" - self.buffer_writer = PandasCsvBufferWriter(**self.get_options()) + self.buffer_writer = PandasCsvBufferWriter(**self.get_options(options_type="kohesio_pandas_buffer_writer")) return self def execute(self): diff --git a/koheesio/steps/writers/stream.py b/koheesio/steps/writers/stream.py index db38867a..63e2f724 100644 --- a/koheesio/steps/writers/stream.py +++ b/koheesio/steps/writers/stream.py @@ -15,8 +15,8 @@ class to run a writer for each batch function to be used as batch_function for StreamWriter (sub)classes """ -from abc import ABC, abstractmethod from typing import Callable, Dict, Optional, Union +from abc import ABC, abstractmethod from pyspark.sql.streaming import DataStreamWriter, StreamingQuery diff --git a/koheesio/utils/__init__.py b/koheesio/utils/__init__.py index 61d2595c..e6f4d8a5 100644 --- a/koheesio/utils/__init__.py +++ b/koheesio/utils/__init__.py @@ -5,10 +5,10 @@ import inspect import os import uuid +from typing import Any, Callable, Dict, Optional, Tuple from functools import partial from importlib import import_module from pathlib import Path -from typing import Any, Callable, Dict, Optional, Tuple __all__ = [ "get_args_for_func", diff --git a/pyproject.toml b/pyproject.toml index 9734a79d..ed0c901e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ keywords = [ authors = [ # TODO: add other contributors { name = "Danny Meijer", email = "danny.meijer@nike.com" }, + { name = "Mikita Sakalouski", email = "mikita.sakalouski@nike.com" }, ] classifiers = [ "Development Status :: 5 - Production/Stable", @@ -19,7 +20,6 @@ classifiers = [ "Natural Language :: English", "Operating System :: OS Independent", "Programming Language :: Python", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -46,12 +46,30 @@ async_http = [ "nest-asyncio>=1.6.0", ] box = ["boxsdk[jwt]==3.8.1"] -pandas = ["pandas<2.0.0", "setuptools"] +# SFTP dependencies in to_csv line_iterator +pandas = ["pandas>=1.3", "setuptools"] pyspark = ["pyspark>=3.2.0", "pyarrow>13"] -se = ["spark-expectations>=1.1.0"] +#FIXME: loose versioning in spark_excpectations for pluggy module +# se = ["spark-expectations>=1.1.0"] +se = [] sftp = ["paramiko>=2.6.0"] delta = ["delta-spark>=2.2"] dev = ["black", "isort", "ruff", "mypy", "pylint", "colorama", "types-PyYAML"] +test = [ + "chispa", + "coverage[toml]", + "pytest", + "pytest-asyncio", + "pytest-cov", + "pytest-mock", + "pytest-order", + "pytest-sftpserver", + "pytest-xdist", + "pytest-randomly", + "requests_mock", + "time_machine", +] + [project.urls] Documentation = "https://github.com/Nike-Inc/koheesio#readme" @@ -68,114 +86,6 @@ Source = "https://github.com/Nike-Inc/koheesio" requires = ["hatchling"] build-backend = "hatchling.build" -[tool.hatch.version] -description = """ -The version of the package is dynamically set and is maintained in the top-level `__init__.py` file of the package. ---- -Bump by running `hatch version` with one of the following release types: -- `major` - breaking changes, i.e. 2.0.0 -- `minor` - new features, i.e. 1.1.0 -- `micro` `fix` or `patch` - bug fixes, i.e. 1.0.1 -- `a` or `alpha` - alpha release, i.e. 1.0.0a1 -- `b` or `beta` - beta release, i.e. 1.0.0b1 -- `c` `rc` `pre` or `preview` - release candidate, i.e. 1.0.0rc1 -- `r` `rev` or `post` - post release, i.e. 1.0.0.post1 -- `dev` - development release, i.e. 1.0.0.dev1 -""" -path = "koheesio/__about__.py" - -[tool.hatch.envs.hatch-static-analysis.scripts] -#TODO: move scripts from linting and style here -format-check = "black --check --diff ." -format-fix = "black ." -lint-check = "ruff check ." -lint-fix = "echo 'No formatting is required for this project.'" - -[tool.hatch.envs.default] -description = """ -The default environment is used for development and general use. ---- -We use the `uv` installer by default. This is a superfast, Rust-based installer. -Run `hatch run` to run scripts in the default environment. - -# Code Quality -To check and format the codebase, we use: - - `black` for code formatting - - `isort` for import sorting (includes colorama for colored output) - - `ruff` for linting. - - `mypy` for static type checking. - - `pylint` for code quality checks. ---- -There are several ways to run style checks and formatting: -- `hatch run black-check` will check the codebase with black without applying fixes. -- `hatch run black-fmt` will format the codebase using black. -- `hatch run isort-check` will check the codebase with isort without applying fixes. -- `hatch run isort-fmt` will format the codebase using isort. -- `hatch run ruff-check` will check the codebase with ruff without applying fixes. -- `hatch run ruff-fmt` will format the codebase using ruff. -- `hatch run mypy-check` will check the codebase with mypy. -- `hatch run pylint-check` will check the codebase with pylint. -- `hatch run check` will run all the above checks (including pylint and mypy). -- `hatch run fmt` or `hatch run fix` will format the codebase using black, isort, and ruff. -- `hatch run lint` will run ruff, mypy, and pylint. - -# Testing and Coverage -To run the test suite, use: -- `hatch run all-tests` to run the full test suite. -- `hatch run spark-tests` to run the Spark test suite. -- `hatch run log-versions` to log the Python and PySpark versions. -- `hatch run coverage` or `hatch run cov` to run the test suite with coverage. - -Note: the full test suite is will run all tests in all specified Python and PySpark versions. If you want to run tests -against specific versions, you can add `+py=3.10` or `+version=pyspark34` to the command (replace the versions with the -desired ones). - -For lighter / faster testing, use the `dev` environment with `hatch shell dev` and run the tests with `pytest` or use -the `all-tests` or `spark-tests` command. -""" -installer = "uv" -features = [ - "async", - "async_http", - "pyspark", - "pandas", - "sftp", - "delta", - "se", - "box", - "dev", -] - -[tool.hatch.envs.default.scripts] -# TODO: add scripts section based on Makefile -# TODO: add bandit -# TODO: move scripts from linting and style here -# Code Quality commands -black-check = "black --check --diff ." -black-fmt = "black ." -isort-check = "isort . --check --diff --color" -isort-fmt = "isort ." -ruff-check = "ruff check ." -ruff-fmt = "ruff check . --fix" -mypy-check = "mypy koheesio" -pylint-check = "pylint --output-format=colorized -d W0511 koheesio" -check = [ - "- black-check", - "- isort-check", - "- ruff-check", - "- mypy-check", - "- pylint-check", -] -fmt = ["black-fmt", "isort-fmt", "ruff-fmt"] -fix = "fmt" -lint = ["- ruff-fmt", "- mypy-check", "pylint-check"] -# Testing and Coverage commands -log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark" -all-tests = "pytest test/ {env:HATCH_TEST_ARGS:} {args}" -spark-tests = "pytest test/spark {env:HATCH_TEST_ARGS:} {args}" -non-spark-tests = "pytest test/ {env:HATCH_TEST_ARGS:} {args} -k 'not spark'" -coverage = "pytest test/ {env:HATCH_TEST_ARGS:} {args} --cov=koheesio --cov-report=html --cov-report=term-missing --cov-fail-under=90" -cov = "coverage" [tool.black] line-length = 120 @@ -183,12 +93,11 @@ target-version = ['py39', 'py310', 'py311', 'py312'] include = '\.pyi?$' extend-exclude = ''' /( - | __research__ - | __notebooks__ - | test/data + | tests/_data )/ ''' + [tool.isort] profile = "black" skip = [ @@ -215,7 +124,20 @@ skip = [ "node_modules", "venv", ] -force_to_top = ["*"] +force_to_top = ["__future__", "typing"] +sections = [ + "FUTURE", + "STDLIB", + "THIRDPARTY", + "PYDANTIC", + "PYSPARK", + "KOHEESIO", + "FIRSTPARTY", + "LOCALFOLDER", +] +known_pydantic = ["pydantic"] +known_pyspark = ["pyspark"] +known_koheesio = ["koheesio"] [tool.ruff] # https://docs.astral.sh/ruff/configuration/#using-pyprojecttoml @@ -316,10 +238,6 @@ unfixable = [] # Allow unused variables when underscore-prefixed. dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" -[tool.ruff.lint.isort.sections] -"koheesio" = ["koheesio"] -"pyspark" = ["pyspark"] - [tool.mypy] check_untyped_defs = false disallow_untyped_calls = false @@ -413,17 +331,71 @@ notes = ["FIXME", "TODO"] [tool.pylint.refactoring] max-nested-blocks = 3 +[tool.pytest.ini_options] +markers = [ + "default: added to all tests by default if no other marker expect of standard pytest markers is present", + "spark: mark a test as a Spark test", + # "sftp: mark a test as an SFTP test", + # "se: mark a test as a Spark Expectations test", + # "box: mark a test as a Box test", + # "asyncio: mark a test as an asyncio test", + # "asyncio_http: mark a test as an asyncio HTTP test", +] +addopts = "-q --color=yes --order-scope=module" +log_level = "CRITICAL" +filterwarnings = [ + # pyspark.pandas warnings + "ignore:distutils.*:DeprecationWarning:pyspark.pandas.*", + "ignore:'PYARROW_IGNORE_TIMEZONE'.*:UserWarning:pyspark.pandas.*", + "ignore:distutils.*:DeprecationWarning:pyspark.sql.pandas.*", + "ignore:is_datetime64tz_dtype.*:DeprecationWarning:pyspark.sql.pandas.*", + # Koheesio warnings + "ignore:DayTimeIntervalType .*:UserWarning:koheesio.steps.integrations.snowflake.*", + "ignore:RankDedup is deprecated. Use RowNumberDedup instead.:UserWarning:koheesio.steps.transformations.*", +] + +[tool.coverage.run] +branch = true +source = ["koheesio"] + +[tool.coverage.report] +exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] +omit = ["tests/*"] + ### ~~~~~~~~~~~~~~~~~~~~~ ### # Unit Testing and Coverage # ### ~~~~~~~~~~~~~~~~~~~~~ ### # -#TODO: see if we can make this through hatch-test env (using hatch test command) -[tool.hatch.envs.test] + +[tool.hatch.version] +description = """ +The version of the package is dynamically set and is maintained in the top-level `__init__.py` file of the package. +--- +Bump by running `hatch version` with one of the following release types: +- `major` - breaking changes, i.e. 2.0.0 +- `minor` - new features, i.e. 1.1.0 +- `micro` `fix` or `patch` - bug fixes, i.e. 1.0.1 +- `a` or `alpha` - alpha release, i.e. 1.0.0a1 +- `b` or `beta` - beta release, i.e. 1.0.0b1 +- `c` `rc` `pre` or `preview` - release candidate, i.e. 1.0.0rc1 +- `r` `rev` or `post` - post release, i.e. 1.0.0.post1 +- `dev` - development release, i.e. 1.0.0.dev1 +""" +path = "koheesio/__about__.py" + +[tool.hatch.envs.hatch-static-analysis.scripts] +#TODO: move scripts from linting and style here +format-check = "black --check --diff ." +format-fix = "black ." +lint-check = "ruff check ." +lint-fix = "echo 'No formatting is required for this project.'" + +[tool.hatch.envs.hatch-test] description = """ The test environment is used to run the test suite. --- -- Run `hatch run test:all-tests` to run the full test suite. +- Run `hatch tests` to run the full test suite. - Run `hatch run test:spark-tests` to run the Spark test suite. - Run `hatch run test:log-versions` to log the Python and PySpark versions. - You can test against specific Python and PySpark versions by adding `+py=3.10` or `+version=pyspark34` to the command @@ -432,19 +404,6 @@ The test environment is used to run the test suite. Note: Test env will run against all specified python and pyspark versions. Use the `dev` environment for development and general use. """ -dependencies = [ - "chispa", - "coverage[toml]", - "pytest", - "pytest-asyncio", - "pytest-cov", - "pytest-mock", - "pytest-order", - "pytest-sftpserver", - "pytest-xdist", - "requests_mock", - "time_machine", -] features = [ "async", "async_http", @@ -455,30 +414,33 @@ features = [ "sftp", "delta", "dev", + "test", ] -matrix = [ - { python = [ - "3.9", - ], version = [ - "pyspark33", - "pyspark34", - ] }, - { python = [ - "3.10", - ], version = [ - "pyspark33", - "pyspark34", - "pyspark35", - ] }, - { python = [ - "3.11", - "3.12", - ], version = [ - "pyspark35", - ] }, -] -[tool.hatch.envs.test.overrides] +parallel = true +retries = 2 +retry-delay = 1 + +[tool.hatch.envs.hatch-test.scripts] +run = "pytest{env:HATCH_TEST_ARGS:} {args} -n auto" +run-cov = "coverage run -m pytest{env:HATCH_TEST_ARGS:} {args}" +cov-combine = "coverage combine" +cov-report = "coverage report" + + +[[tool.hatch.envs.hatch-test.matrix]] +python = ["3.9"] +version = ["pyspark33", "pyspark34"] + +[[tool.hatch.envs.hatch-test.matrix]] +python = ["3.10"] +version = ["pyspark33", "pyspark34", "pyspark35"] + +[[tool.hatch.envs.hatch-test.matrix]] +python = ["3.11", "3.12"] +version = ["pyspark35"] + +[tool.hatch.envs.hatch-test.overrides] matrix.version.extra-dependencies = [ { value = "pyspark>=3.3,<3.4", if = [ "pyspark33", @@ -491,24 +453,114 @@ matrix.version.extra-dependencies = [ ] }, ] -[tool.pytest.ini_options] -addopts = "-q --color=yes --order-scope=module" -log_level = "CRITICAL" -testpaths = ["test"] - -#FIXME: Is it duplication of hatch coverage ? -[tool.coverage.run] -branch = true -source = ["koheesio"] - -[tool.coverage.report] -exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] -omit = ["test/*"] +name.".*".env-vars = [ + { key = "PYTEST_XDIST_AUTO_NUM_WORKERS", value = "2" }, + { key = "KOHEESIO__PRINT_LOGO", value = "False" }, +] ### ~~~~ ### # Docsite # ### ~~~~ ### # +[tool.hatch.envs.default] +description = """ +The default environment is used for development and general use. +--- +We use the `uv` installer by default. This is a superfast, Rust-based installer. +Run `hatch run` to run scripts in the default environment. + +# Code Quality +To check and format the codebase, we use: + - `black` for code formatting + - `isort` for import sorting (includes colorama for colored output) + - `ruff` for linting. + - `mypy` for static type checking. + - `pylint` for code quality checks. +--- +There are several ways to run style checks and formatting: +- `hatch run black-check` will check the codebase with black without applying fixes. +- `hatch run black-fmt` will format the codebase using black. +- `hatch run isort-check` will check the codebase with isort without applying fixes. +- `hatch run isort-fmt` will format the codebase using isort. +- `hatch run ruff-check` will check the codebase with ruff without applying fixes. +- `hatch run ruff-fmt` will format the codebase using ruff. +- `hatch run mypy-check` will check the codebase with mypy. +- `hatch run pylint-check` will check the codebase with pylint. +- `hatch run check` will run all the above checks (including pylint and mypy). +- `hatch run fmt` or `hatch run fix` will format the codebase using black, isort, and ruff. +- `hatch run lint` will run ruff, mypy, and pylint. + +# Testing and Coverage +To run the test suite, use: +- `hatch run all-tests` to run the full test suite. +- `hatch run spark-tests` to run the Spark test suite. +- `hatch run log-versions` to log the Python and PySpark versions. +- `hatch run coverage` or `hatch run cov` to run the test suite with coverage. + +Note: the full test suite is will run all tests in all specified Python and PySpark versions. If you want to run tests +against specific versions, you can add `+py=3.10` or `+version=pyspark34` to the command (replace the versions with the +desired ones). + +For lighter / faster testing, use the `dev` environment with `hatch shell dev` and run the tests with `pytest` or use +the `all-tests` or `spark-tests` command. +""" +installer = "uv" +features = [ + "async", + "async_http", + "pandas", + "pyspark", + "sftp", + "delta", + "se", + "box", + "dev", +] + + +[tool.hatch.envs.default.scripts] +# TODO: add scripts section based on Makefile +# TODO: add bandit +# TODO: move scripts from linting and style here +# Code Quality commands +black-check = "black --check --diff ." +black-fmt = "black ." +isort-check = "isort . --check --diff --color" +isort-fmt = "isort ." +ruff-check = "ruff check ." +ruff-fmt = "ruff check . --fix" +mypy-check = "mypy koheesio" +pylint-check = "pylint --output-format=colorized -d W0511 koheesio" +check = [ + "- black-check", + "- isort-check", + "- ruff-check", + "- mypy-check", + "- pylint-check", +] +fmt = ["black-fmt", "isort-fmt", "ruff-fmt"] +fix = "fmt" +lint = ["- ruff-fmt", "- mypy-check", "pylint-check"] +log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark" +test = "- pytest{env:HATCH_TEST_ARGS:} {args} -n 2" +spark-tests = "test -m spark" +non-spark-tests = "test -m \"not spark\"" + +# scripts.run = "echo bla {env:HATCH_TEST_ARGS:} {args}" +# scripts.run = "- log-versions && pytest tests/ {env:HATCH_TEST_ARGS:} {args}" +# run ="echo {args}" +# run = "- pytest tests/ {env:HATCH_TEST_ARGS:} {args}" +# run-cov = "coverage run -m pytest{env:HATCH_TEST_ARGS:} {args}" +# cov-combine = "coverage combine" +# cov-report = "coverage report" +# log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark" +# +# +# +# coverage = "- pytest tests/ {env:HATCH_TEST_ARGS:} {args} --cov=koheesio --cov-report=html --cov-report=term-missing --cov-fail-under=90" +# cov = "coverage" + + [tool.hatch.envs.docs] description = """ The docs environment is used to build the documentation. @@ -572,6 +624,18 @@ Available scripts: - `coverage` or `cov` - run the test suite with coverage. """ path = ".venv" -template = "test" python = "3.10" +template = "default" +features = [ + "async", + "async_http", + "box", + "pandas", + "pyspark", + "se", + "sftp", + "delta", + "dev", + "test", +] extra-dependencies = ["pyspark==3.4.*"] diff --git a/test/_data/context/common.yml b/tests/_data/context/common.yml similarity index 100% rename from test/_data/context/common.yml rename to tests/_data/context/common.yml diff --git a/test/_data/context/dev.yml b/tests/_data/context/dev.yml similarity index 100% rename from test/_data/context/dev.yml rename to tests/_data/context/dev.yml diff --git a/test/_data/context/sample.json b/tests/_data/context/sample.json similarity index 100% rename from test/_data/context/sample.json rename to tests/_data/context/sample.json diff --git a/test/_data/context/sample.toml b/tests/_data/context/sample.toml similarity index 100% rename from test/_data/context/sample.toml rename to tests/_data/context/sample.toml diff --git a/test/_data/context/sample.yaml b/tests/_data/context/sample.yaml similarity index 100% rename from test/_data/context/sample.yaml rename to tests/_data/context/sample.yaml diff --git a/test/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc b/tests/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc similarity index 100% rename from test/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc rename to tests/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc diff --git a/test/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro b/tests/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro similarity index 100% rename from test/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro rename to tests/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro diff --git a/test/_data/readers/csv_file/dummy.csv b/tests/_data/readers/csv_file/dummy.csv similarity index 100% rename from test/_data/readers/csv_file/dummy.csv rename to tests/_data/readers/csv_file/dummy.csv diff --git a/test/_data/readers/csv_file/dummy_semicolon.csv b/tests/_data/readers/csv_file/dummy_semicolon.csv similarity index 100% rename from test/_data/readers/csv_file/dummy_semicolon.csv rename to tests/_data/readers/csv_file/dummy_semicolon.csv diff --git a/test/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc b/tests/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc similarity index 100% rename from test/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc rename to tests/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc diff --git a/test/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc b/tests/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc similarity index 100% rename from test/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc rename to tests/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc diff --git a/test/_data/readers/delta_file/_delta_log/00000000000000000000.json b/tests/_data/readers/delta_file/_delta_log/00000000000000000000.json similarity index 100% rename from test/_data/readers/delta_file/_delta_log/00000000000000000000.json rename to tests/_data/readers/delta_file/_delta_log/00000000000000000000.json diff --git a/test/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet b/tests/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet similarity index 100% rename from test/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet rename to tests/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet diff --git a/test/_data/readers/json_file/dummy.json b/tests/_data/readers/json_file/dummy.json similarity index 100% rename from test/_data/readers/json_file/dummy.json rename to tests/_data/readers/json_file/dummy.json diff --git a/test/_data/readers/json_file/dummy_simple.json b/tests/_data/readers/json_file/dummy_simple.json similarity index 100% rename from test/_data/readers/json_file/dummy_simple.json rename to tests/_data/readers/json_file/dummy_simple.json diff --git a/test/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc b/tests/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc similarity index 100% rename from test/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc rename to tests/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc diff --git a/test/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc b/tests/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc similarity index 100% rename from test/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc rename to tests/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc diff --git a/test/_data/sql/spark_sql_reader.sql b/tests/_data/sql/spark_sql_reader.sql similarity index 100% rename from test/_data/sql/spark_sql_reader.sql rename to tests/_data/sql/spark_sql_reader.sql diff --git a/test/_data/steps/expected_step_output.yaml b/tests/_data/steps/expected_step_output.yaml similarity index 100% rename from test/_data/steps/expected_step_output.yaml rename to tests/_data/steps/expected_step_output.yaml diff --git a/test/_data/steps/expected_step_output_simple.yaml b/tests/_data/steps/expected_step_output_simple.yaml similarity index 100% rename from test/_data/steps/expected_step_output_simple.yaml rename to tests/_data/steps/expected_step_output_simple.yaml diff --git a/test/_data/transformations/dummy.sql b/tests/_data/transformations/dummy.sql similarity index 100% rename from test/_data/transformations/dummy.sql rename to tests/_data/transformations/dummy.sql diff --git a/test/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl b/tests/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl similarity index 100% rename from test/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl rename to tests/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl diff --git a/test/_data/transformations/spark_expectations_resources/test_product_rules.ddl b/tests/_data/transformations/spark_expectations_resources/test_product_rules.ddl similarity index 100% rename from test/_data/transformations/spark_expectations_resources/test_product_rules.ddl rename to tests/_data/transformations/spark_expectations_resources/test_product_rules.ddl diff --git a/test/_data/transformations/spark_expectations_resources/test_product_rules.sql b/tests/_data/transformations/spark_expectations_resources/test_product_rules.sql similarity index 100% rename from test/_data/transformations/spark_expectations_resources/test_product_rules.sql rename to tests/_data/transformations/spark_expectations_resources/test_product_rules.sql diff --git a/test/_data/transformations/string_data/100000_rows_with_strings.json b/tests/_data/transformations/string_data/100000_rows_with_strings.json similarity index 100% rename from test/_data/transformations/string_data/100000_rows_with_strings.json rename to tests/_data/transformations/string_data/100000_rows_with_strings.json diff --git a/test/conftest.py b/tests/conftest.py similarity index 100% rename from test/conftest.py rename to tests/conftest.py diff --git a/test/core/test_context.py b/tests/core/test_context.py similarity index 98% rename from test/core/test_context.py rename to tests/core/test_context.py index b05c89af..920f7382 100644 --- a/test/core/test_context.py +++ b/tests/core/test_context.py @@ -1,6 +1,7 @@ from textwrap import dedent import pytest + from pydantic import SecretStr from koheesio.context import Context @@ -10,15 +11,15 @@ test_context = Context(test_dict) PROJECT_ROOT = get_project_root() -CONTEXT_FOLDER = PROJECT_ROOT / "test" / "_data" / "context" +CONTEXT_FOLDER = PROJECT_ROOT / "tests" / "_data" / "context" SAMPLE_YAML = CONTEXT_FOLDER / "sample.yaml" SAMPLE_JSON = CONTEXT_FOLDER / "sample.json" def test_add(): context = Context(test_dict) - context.add("unit", "test") - assert context.unit == "test" + context.add("unit", "tests") + assert context.unit == "tests" def test_get(): diff --git a/test/core/test_init.py b/tests/core/test_init.py similarity index 100% rename from test/core/test_init.py rename to tests/core/test_init.py diff --git a/test/core/test_logger.py b/tests/core/test_logger.py similarity index 99% rename from test/core/test_logger.py rename to tests/core/test_logger.py index 34ee4025..277d7664 100644 --- a/test/core/test_logger.py +++ b/tests/core/test_logger.py @@ -90,7 +90,7 @@ def test_log_masked(foo, bar, baz, params): log_capture_string = StringIO() ch = logging.StreamHandler(log_capture_string) ch.setLevel(logging.DEBUG) - logger = logging.getLogger("test") + logger = logging.getLogger("tests") logger.setLevel(logging.DEBUG) logger.addHandler(ch) diff --git a/test/core/test_steps.py b/tests/core/test_steps.py similarity index 99% rename from test/core/test_steps.py rename to tests/core/test_steps.py index 41ab9ac3..2d046083 100644 --- a/test/core/test_steps.py +++ b/tests/core/test_steps.py @@ -7,7 +7,9 @@ from unittest.mock import call, patch import pytest + from pydantic import ValidationError + from pyspark.sql import DataFrame from pyspark.sql.functions import lit @@ -197,7 +199,7 @@ def test_step_to_yaml_description_and_name(self): # execute the step test_step.execute() - expected_output_path = PROJECT_ROOT / "test" / "_data" / "steps" + expected_output_path = PROJECT_ROOT / "tests" / "_data" / "steps" # using .strip() to avoid the test failing on leading or trailing whitespaces expected = (expected_output_path / "expected_step_output.yaml").read_text().strip() diff --git a/test/deprecated/nike/ada/__init__.py b/tests/deprecated/nike/ada/__init__.py similarity index 100% rename from test/deprecated/nike/ada/__init__.py rename to tests/deprecated/nike/ada/__init__.py diff --git a/test/models/test_models.py b/tests/models/test_models.py similarity index 99% rename from test/models/test_models.py rename to tests/models/test_models.py index ca695db8..3d981f95 100644 --- a/test/models/test_models.py +++ b/tests/models/test_models.py @@ -1,6 +1,6 @@ import json -from textwrap import dedent from typing import Optional +from textwrap import dedent import pytest import yaml @@ -10,7 +10,6 @@ class TestBaseModel: - class SimpleModel(BaseModel): a: int b: str = "default" diff --git a/test/spark/conftest.py b/tests/spark/conftest.py similarity index 97% rename from test/spark/conftest.py rename to tests/spark/conftest.py index 71406bd1..7066ee92 100644 --- a/test/spark/conftest.py +++ b/tests/spark/conftest.py @@ -8,6 +8,7 @@ import pytest from delta import configure_spark_with_delta_pip + from pyspark.sql import DataFrame, SparkSession from pyspark.sql.types import ( ArrayType, @@ -34,7 +35,7 @@ from koheesio.utils import get_project_root PROJECT_ROOT = get_project_root() -TEST_DATA_PATH = Path(PROJECT_ROOT / "test" / "_data") +TEST_DATA_PATH = Path(PROJECT_ROOT / "tests" / "_data") DELTA_FILE = Path(TEST_DATA_PATH / "readers" / "delta_file") @@ -67,6 +68,8 @@ def spark(warehouse_path, random_uuid): .config("spark.sql.warehouse.dir", warehouse_path) .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.sql.session.timeZone", "UTC") + .config("spark.sql.execution.arrow.pyspark.enabled", "true") + .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true") ) spark_session = configure_spark_with_delta_pip(builder).getOrCreate() diff --git a/test/spark/integrations/box/test_box.py b/tests/spark/integrations/box/test_box.py similarity index 99% rename from test/spark/integrations/box/test_box.py rename to tests/spark/integrations/box/test_box.py index cf851562..5ddf59c5 100644 --- a/test/spark/integrations/box/test_box.py +++ b/tests/spark/integrations/box/test_box.py @@ -3,6 +3,7 @@ from pathlib import PurePath import pytest + from pydantic import ValidationError from koheesio.steps.integrations.box import ( @@ -24,6 +25,8 @@ StructType, ) +pytestmark = pytest.mark.spark + COMMON_PARAMS = { "client_id": SecretStr("client_id"), "client_secret": SecretStr("client_secret"), diff --git a/test/spark/integrations/dq/test_spark_expectations.py b/tests/spark/integrations/dq/test_spark_expectations.py similarity index 89% rename from test/spark/integrations/dq/test_spark_expectations.py rename to tests/spark/integrations/dq/test_spark_expectations.py index 983ff0c4..a4fff146 100644 --- a/test/spark/integrations/dq/test_spark_expectations.py +++ b/tests/spark/integrations/dq/test_spark_expectations.py @@ -1,15 +1,16 @@ from typing import List, Union import pytest + from pyspark.sql import SparkSession -from koheesio.steps.integrations.dq.spark_expectations import ( - SparkExpectationsTransformation, -) from koheesio.utils import get_project_root PROJECT_ROOT = get_project_root() +pytestmark = pytest.mark.spark +pytestmark = pytest.mark.skip(reason="Skipping all tests in this module due to the spark expectation package issues") + class TestSparkExpectationsTransform: """ @@ -45,6 +46,10 @@ def prepare_tables(self, spark, data_path): ) def test_rows_are_dropped(self, spark: SparkSession, prepare_tables): + from koheesio.steps.integrations.dq.spark_expectations import ( + SparkExpectationsTransformation, + ) + input_df = spark.createDataFrame( TestSparkExpectationsTransform.input_data, schema=TestSparkExpectationsTransform.input_schema, @@ -66,6 +71,10 @@ def test_rows_are_dropped(self, spark: SparkSession, prepare_tables): assert err_table_df.count() == 2 def test_meta_columns_are_not_dropped(self, spark, prepare_tables): + from koheesio.steps.integrations.dq.spark_expectations import ( + SparkExpectationsTransformation, + ) + input_df = spark.createDataFrame( TestSparkExpectationsTransform.input_data, schema=TestSparkExpectationsTransform.input_schema, @@ -87,6 +96,10 @@ def test_meta_columns_are_not_dropped(self, spark, prepare_tables): assert "meta_dq_run_date" in output_columns or "meta_dq_run_datetime" in output_columns def test_meta_columns_are_dropped(self, spark, prepare_tables): + from koheesio.steps.integrations.dq.spark_expectations import ( + SparkExpectationsTransformation, + ) + input_df = spark.createDataFrame( TestSparkExpectationsTransform.input_data, schema=TestSparkExpectationsTransform.input_schema, @@ -117,6 +130,10 @@ def apply_spark_sql(spark: SparkSession, source_sql_files: Union[List[str], str] spark.sql(content) def test_with_full_se_user_conf(self): + from koheesio.steps.integrations.dq.spark_expectations import ( + SparkExpectationsTransformation, + ) + conf = { "spark.expectations.notifications.email.enabled": False, "spark.expectations.notifications.email.smtp_host": "mailhost.email.com", @@ -143,6 +160,10 @@ def test_with_full_se_user_conf(self): assert instance.se_user_conf == conf def test_overwrite_error_writer(self): + from koheesio.steps.integrations.dq.spark_expectations import ( + SparkExpectationsTransformation, + ) + """ Test that the error_writer can be overwritten using error_writer_mode and error_writer_format """ @@ -168,6 +189,10 @@ def test_overwrite_error_writer(self): assert error_writer._options == {"mergeSchema": "true"} def test_overwrite_stats_writer(self): + from koheesio.steps.integrations.dq.spark_expectations import ( + SparkExpectationsTransformation, + ) + """ Test that the stats_writer can be overwritten using stats_writer_mode and stats_writer_format and that the error_writer_options default = {}. diff --git a/test/spark/integrations/snowflake/test_snowflake.py b/tests/spark/integrations/snowflake/test_snowflake.py similarity index 99% rename from test/spark/integrations/snowflake/test_snowflake.py rename to tests/spark/integrations/snowflake/test_snowflake.py index 6215494d..a8872c24 100644 --- a/test/spark/integrations/snowflake/test_snowflake.py +++ b/tests/spark/integrations/snowflake/test_snowflake.py @@ -3,6 +3,7 @@ from unittest.mock import Mock, patch import pytest + from pyspark.sql import SparkSession from pyspark.sql import types as t @@ -26,6 +27,8 @@ ) from koheesio.steps.writers import BatchOutputMode +pytestmark = pytest.mark.spark + COMMON_OPTIONS = { "url": "url", "user": "user", diff --git a/test/spark/integrations/snowflake/test_sync_task.py b/tests/spark/integrations/snowflake/test_sync_task.py similarity index 99% rename from test/spark/integrations/snowflake/test_sync_task.py rename to tests/spark/integrations/snowflake/test_sync_task.py index 2563134b..93ef50eb 100644 --- a/test/spark/integrations/snowflake/test_sync_task.py +++ b/tests/spark/integrations/snowflake/test_sync_task.py @@ -4,9 +4,11 @@ from unittest import mock import chispa -import pydantic import pytest from conftest import await_job_completion + +import pydantic + from pyspark.sql import DataFrame from koheesio.steps.delta import DeltaTableStep @@ -20,6 +22,8 @@ from koheesio.steps.writers.delta import DeltaTableWriter from koheesio.steps.writers.stream import ForEachBatchStreamWriter +pytestmark = pytest.mark.spark + COMMON_OPTIONS = { "source_table": DeltaTableStep(table=""), "target_table": "foo.bar", @@ -40,7 +44,7 @@ @pytest.fixture def snowflake_staging_file(): - filename = "test/data/snowflake_staging.parq" + filename = "tests/_data/snowflake_staging.parq" if os.path.exists(filename): shutil.rmtree(filename) diff --git a/test/spark/readers/test_auto_loader.py b/tests/spark/readers/test_auto_loader.py similarity index 97% rename from test/spark/readers/test_auto_loader.py rename to tests/spark/readers/test_auto_loader.py index e0c7f1d7..b72b36b7 100644 --- a/test/spark/readers/test_auto_loader.py +++ b/tests/spark/readers/test_auto_loader.py @@ -1,11 +1,12 @@ -from pathlib import Path - import pytest from chispa import assert_df_equality + from pyspark.sql.types import * from koheesio.steps.readers.databricks.autoloader import AutoLoader +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "bad_format", diff --git a/test/spark/readers/test_delta_reader.py b/tests/spark/readers/test_delta_reader.py similarity index 99% rename from test/spark/readers/test_delta_reader.py rename to tests/spark/readers/test_delta_reader.py index 7511ba97..0db2db82 100644 --- a/test/spark/readers/test_delta_reader.py +++ b/tests/spark/readers/test_delta_reader.py @@ -1,10 +1,13 @@ import pytest + from pyspark.sql import functions as F from pyspark.sql.dataframe import DataFrame from koheesio.steps.readers.delta import DeltaTableReader from koheesio.steps.spark import AnalysisException +pytestmark = pytest.mark.spark + def test_delta_table_reader(spark): df = DeltaTableReader(table="delta_test_table").read() diff --git a/test/spark/readers/test_file_loader.py b/tests/spark/readers/test_file_loader.py similarity index 99% rename from test/spark/readers/test_file_loader.py rename to tests/spark/readers/test_file_loader.py index a24662bb..01f412e3 100644 --- a/test/spark/readers/test_file_loader.py +++ b/tests/spark/readers/test_file_loader.py @@ -11,6 +11,8 @@ ) from koheesio.steps.spark import AnalysisException +pytestmark = pytest.mark.spark + @pytest.fixture() def json_file(data_path): diff --git a/test/spark/readers/test_hana.py b/tests/spark/readers/test_hana.py similarity index 94% rename from test/spark/readers/test_hana.py rename to tests/spark/readers/test_hana.py index 33980f6d..7375038a 100644 --- a/test/spark/readers/test_hana.py +++ b/tests/spark/readers/test_hana.py @@ -1,9 +1,13 @@ from unittest import mock +import pytest + from pyspark.sql import SparkSession from koheesio.steps.readers.hana import HanaReader +pytestmark = pytest.mark.spark + class TestHanaReader: common_options = { diff --git a/test/spark/readers/test_jdbc.py b/tests/spark/readers/test_jdbc.py similarity index 98% rename from test/spark/readers/test_jdbc.py rename to tests/spark/readers/test_jdbc.py index 0816c85c..0dd8c83a 100644 --- a/test/spark/readers/test_jdbc.py +++ b/tests/spark/readers/test_jdbc.py @@ -1,10 +1,13 @@ from unittest import mock import pytest + from pyspark.sql import SparkSession from koheesio.steps.readers.jdbc import JdbcReader +pytestmark = pytest.mark.spark + class TestJdbcReader: common_options = { diff --git a/test/spark/readers/test_memory.py b/tests/spark/readers/test_memory.py similarity index 98% rename from test/spark/readers/test_memory.py rename to tests/spark/readers/test_memory.py index 56af0314..795f6f07 100644 --- a/test/spark/readers/test_memory.py +++ b/tests/spark/readers/test_memory.py @@ -1,9 +1,12 @@ import pytest from chispa import assert_df_equality + from pyspark.sql.types import StructType from koheesio.steps.readers.memory import DataFormat, InMemoryDataReader +pytestmark = pytest.mark.spark + class TestInMemoryDataReader: # fmt: off diff --git a/test/spark/readers/test_metastore_reader.py b/tests/spark/readers/test_metastore_reader.py similarity index 87% rename from test/spark/readers/test_metastore_reader.py rename to tests/spark/readers/test_metastore_reader.py index e107f78c..14fdc08d 100644 --- a/test/spark/readers/test_metastore_reader.py +++ b/tests/spark/readers/test_metastore_reader.py @@ -1,7 +1,11 @@ +import pytest + from pyspark.sql.dataframe import DataFrame from koheesio.steps.readers.metastore import MetastoreReader +pytestmark = pytest.mark.spark + def test_metastore_reader(spark): df = MetastoreReader(table="klettern.delta_test_table").read() diff --git a/test/spark/readers/test_reader.py b/tests/spark/readers/test_reader.py similarity index 89% rename from test/spark/readers/test_reader.py rename to tests/spark/readers/test_reader.py index cbe09983..e5437d00 100644 --- a/test/spark/readers/test_reader.py +++ b/tests/spark/readers/test_reader.py @@ -1,5 +1,9 @@ +import pytest + from koheesio.steps.readers.dummy import DummyReader +pytestmark = pytest.mark.spark + def test_reader(spark): test_reader = DummyReader(range=1) diff --git a/test/spark/readers/test_rest_api.py b/tests/spark/readers/test_rest_api.py similarity index 99% rename from test/spark/readers/test_rest_api.py rename to tests/spark/readers/test_rest_api.py index 9dcc6e9b..a7da1567 100644 --- a/test/spark/readers/test_rest_api.py +++ b/tests/spark/readers/test_rest_api.py @@ -2,9 +2,10 @@ import requests_mock from aiohttp import ClientSession, TCPConnector from aiohttp_retry import ExponentialRetry -from pyspark.sql.types import MapType, StringType, StructField, StructType from yarl import URL +from pyspark.sql.types import MapType, StringType, StructField, StructType + from koheesio.steps.asyncio.http import AsyncHttpStep from koheesio.steps.http import PaginatedHtppGetStep from koheesio.steps.readers.rest_api import AsyncHttpGetStep, RestApiReader @@ -12,6 +13,8 @@ ASYNC_BASE_URL = "http://httpbin.org" ASYNC_GET_ENDPOINT = URL(f"{ASYNC_BASE_URL}/get") +pytestmark = pytest.mark.spark + @pytest.fixture def mock_paginated_api(): diff --git a/test/spark/readers/test_spark_sql_reader.py b/tests/spark/readers/test_spark_sql_reader.py similarity index 85% rename from test/spark/readers/test_spark_sql_reader.py rename to tests/spark/readers/test_spark_sql_reader.py index 5d6cdb47..c03f21d6 100644 --- a/test/spark/readers/test_spark_sql_reader.py +++ b/tests/spark/readers/test_spark_sql_reader.py @@ -2,6 +2,8 @@ from koheesio.steps.readers.spark_sql_reader import SparkSqlReader +pytestmark = pytest.mark.spark + def test_spark_sql_reader(spark, data_path): spark.sql("CREATE TABLE IF NOT EXISTS table_A (id INT, a_name STRING) USING DELTA") @@ -27,10 +29,10 @@ def test_spark_sql_reader(spark, data_path): def test_spark_sql_reader_failed(): with pytest.raises(ValueError): - SparkSqlReader(sql="SELECT 1", sql_path="test/resources/sql/none_existent_path.sql") + SparkSqlReader(sql="SELECT 1", sql_path="tests/resources/sql/none_existent_path.sql") with pytest.raises(FileNotFoundError): - SparkSqlReader(sql_path="test/resources/sql/none_existent_path.sql") + SparkSqlReader(sql_path="tests/resources/sql/none_existent_path.sql") with pytest.raises(ValueError): SparkSqlReader(sql=None) diff --git a/test/spark/readers/test_teradata.py b/tests/spark/readers/test_teradata.py similarity index 94% rename from test/spark/readers/test_teradata.py rename to tests/spark/readers/test_teradata.py index c80b8c3a..244c1fb3 100644 --- a/test/spark/readers/test_teradata.py +++ b/tests/spark/readers/test_teradata.py @@ -1,9 +1,13 @@ from unittest import mock +import pytest + from pyspark.sql import SparkSession from koheesio.steps.readers.teradata import TeradataReader +pytestmark = pytest.mark.spark + class TestTeradataReader: common_options = { diff --git a/test/spark/tasks/test_etl_task.py b/tests/spark/tasks/test_etl_task.py similarity index 98% rename from test/spark/tasks/test_etl_task.py rename to tests/spark/tasks/test_etl_task.py index a09ab40d..26a9932c 100644 --- a/test/spark/tasks/test_etl_task.py +++ b/tests/spark/tasks/test_etl_task.py @@ -1,4 +1,6 @@ +import pytest from conftest import await_job_completion + from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import col, lit @@ -12,6 +14,8 @@ from koheesio.steps.writers.dummy import DummyWriter from koheesio.tasks.etl_task import EtlTask +pytestmark = pytest.mark.spark + def dummy_function(df: DataFrame): return df.withColumn("hello", lit("world")) diff --git a/test/spark/test_delta.py b/tests/spark/test_delta.py similarity index 99% rename from test/spark/test_delta.py rename to tests/spark/test_delta.py index d0aa8aba..fe79ab87 100644 --- a/test/spark/test_delta.py +++ b/tests/spark/test_delta.py @@ -3,12 +3,16 @@ import pytest from conftest import setup_test_data + from pydantic import ValidationError + from pyspark.sql.types import LongType from koheesio.logger import LoggingFactory from koheesio.steps.delta import DeltaTableStep +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name="test_delta") diff --git a/test/spark/test_spark.py b/tests/spark/test_spark.py similarity index 90% rename from test/spark/test_spark.py rename to tests/spark/test_spark.py index 93f50306..078d9144 100644 --- a/test/spark/test_spark.py +++ b/tests/spark/test_spark.py @@ -13,6 +13,8 @@ from koheesio.models import SecretStr +pytestmark = pytest.mark.spark + class TestSparkImportFailures: def test_import_error_no_error(self): @@ -27,6 +29,6 @@ def test_import_error_with_error(self): with pytest.raises(ImportError): from pyspark.sql import SparkSession - SparkSession.builder.appName("test").getOrCreate() + SparkSession.builder.appName("tests").getOrCreate() pass diff --git a/test/spark/test_warnings.py b/tests/spark/test_warnings.py similarity index 96% rename from test/spark/test_warnings.py rename to tests/spark/test_warnings.py index 49168fdc..73c69d1d 100644 --- a/test/spark/test_warnings.py +++ b/tests/spark/test_warnings.py @@ -4,6 +4,8 @@ from koheesio.steps.writers import BatchOutputMode +pytestmark = pytest.mark.spark + @pytest.mark.parametrize("output_mode", [(BatchOutputMode.APPEND)]) def test_muted_warnings(output_mode, dummy_df, spark): diff --git a/test/spark/transformations/date_time/test_date_time.py b/tests/spark/transformations/date_time/test_date_time.py similarity index 99% rename from test/spark/transformations/date_time/test_date_time.py rename to tests/spark/transformations/date_time/test_date_time.py index b644012d..cfb6169c 100644 --- a/test/spark/transformations/date_time/test_date_time.py +++ b/tests/spark/transformations/date_time/test_date_time.py @@ -9,6 +9,8 @@ ToTimestamp, ) +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name="test_date_time") diff --git a/test/spark/transformations/date_time/test_interval.py b/tests/spark/transformations/date_time/test_interval.py similarity index 99% rename from test/spark/transformations/date_time/test_interval.py rename to tests/spark/transformations/date_time/test_interval.py index b6581bbe..5c7d177c 100644 --- a/test/spark/transformations/date_time/test_interval.py +++ b/tests/spark/transformations/date_time/test_interval.py @@ -1,6 +1,7 @@ import datetime as dt import pytest + from pyspark.sql import types as T from koheesio.logger import LoggingFactory @@ -13,6 +14,8 @@ dt_column, ) +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name="test_date_time") diff --git a/test/spark/transformations/strings/test_change_case.py b/tests/spark/transformations/strings/test_change_case.py similarity index 99% rename from test/spark/transformations/strings/test_change_case.py rename to tests/spark/transformations/strings/test_change_case.py index 64599ca5..e745d2bc 100644 --- a/test/spark/transformations/strings/test_change_case.py +++ b/tests/spark/transformations/strings/test_change_case.py @@ -12,6 +12,8 @@ UpperCase, ) +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) data = [ ["Banana lemon orange", 1000, "USA"], diff --git a/test/spark/transformations/strings/test_concat.py b/tests/spark/transformations/strings/test_concat.py similarity index 99% rename from test/spark/transformations/strings/test_concat.py rename to tests/spark/transformations/strings/test_concat.py index 145c2b41..c921ba7c 100644 --- a/test/spark/transformations/strings/test_concat.py +++ b/tests/spark/transformations/strings/test_concat.py @@ -5,6 +5,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.strings.concat import Concat +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) diff --git a/test/spark/transformations/strings/test_pad.py b/tests/spark/transformations/strings/test_pad.py similarity index 99% rename from test/spark/transformations/strings/test_pad.py rename to tests/spark/transformations/strings/test_pad.py index aca8674e..145f4630 100644 --- a/test/spark/transformations/strings/test_pad.py +++ b/tests/spark/transformations/strings/test_pad.py @@ -8,6 +8,8 @@ from koheesio.models import ValidationError from koheesio.steps.transformations.strings.pad import LPad, Pad, RPad +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) data = [[1, 2, "hello"], [3, 4, "world"], [3, 4, None]] diff --git a/test/spark/transformations/strings/test_regexp.py b/tests/spark/transformations/strings/test_regexp.py similarity index 99% rename from test/spark/transformations/strings/test_regexp.py rename to tests/spark/transformations/strings/test_regexp.py index af4022b6..211a44eb 100644 --- a/test/spark/transformations/strings/test_regexp.py +++ b/tests/spark/transformations/strings/test_regexp.py @@ -7,6 +7,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.strings.regexp import RegexpExtract, RegexpReplace +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) data_year_wk = [["2020 W1"], ["2021 WK2"], ["2022WK3"]] diff --git a/test/spark/transformations/strings/test_split.py b/tests/spark/transformations/strings/test_split.py similarity index 99% rename from test/spark/transformations/strings/test_split.py rename to tests/spark/transformations/strings/test_split.py index cd0bf261..91b31a87 100644 --- a/test/spark/transformations/strings/test_split.py +++ b/tests/spark/transformations/strings/test_split.py @@ -7,6 +7,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.strings.split import SplitAll, SplitAtFirstMatch +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) string_data = [ diff --git a/test/spark/transformations/strings/test_string_replace.py b/tests/spark/transformations/strings/test_string_replace.py similarity index 98% rename from test/spark/transformations/strings/test_string_replace.py rename to tests/spark/transformations/strings/test_string_replace.py index 0beeb684..d9280bb8 100644 --- a/test/spark/transformations/strings/test_string_replace.py +++ b/tests/spark/transformations/strings/test_string_replace.py @@ -7,6 +7,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.strings.replace import Replace +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) data_with_strings = [[1, 2, "hello"], [3, 4, "world"], [3, 4, None]] diff --git a/test/spark/transformations/strings/test_substring.py b/tests/spark/transformations/strings/test_substring.py similarity index 98% rename from test/spark/transformations/strings/test_substring.py rename to tests/spark/transformations/strings/test_substring.py index 4db189a7..1be99f53 100644 --- a/test/spark/transformations/strings/test_substring.py +++ b/tests/spark/transformations/strings/test_substring.py @@ -7,6 +7,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.strings.substring import Substring +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) data_with_strings = [[1, 2, "hello"], [3, 4, "world"], [3, 4, None]] diff --git a/test/spark/transformations/strings/test_trim.py b/tests/spark/transformations/strings/test_trim.py similarity index 99% rename from test/spark/transformations/strings/test_trim.py rename to tests/spark/transformations/strings/test_trim.py index 72325b96..742279b2 100644 --- a/test/spark/transformations/strings/test_trim.py +++ b/tests/spark/transformations/strings/test_trim.py @@ -3,11 +3,14 @@ """ import pytest + from pydantic import ValidationError from koheesio.logger import LoggingFactory from koheesio.steps.transformations.strings.trim import LTrim, RTrim, Trim +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True) diff --git a/test/spark/transformations/test_arrays.py b/tests/spark/transformations/test_arrays.py similarity index 99% rename from test/spark/transformations/test_arrays.py rename to tests/spark/transformations/test_arrays.py index 6a71bb2f..61ae1c60 100644 --- a/test/spark/transformations/test_arrays.py +++ b/tests/spark/transformations/test_arrays.py @@ -1,6 +1,7 @@ import math import pytest + from pyspark.sql.types import ( ArrayType, FloatType, @@ -25,6 +26,8 @@ ExplodeDistinct, ) +pytestmark = pytest.mark.spark + input_data = [ (1, [1, 2, 2, 3, 3, 3], ["a", "b", "b", "c", "c", " ", None], [1.0, 2.0, 2.0, 3.0, 3.0, 3.0, float("nan")]), (2, [4, 4, 5, 5, 6, 6, None], ["d", "e", "e", "f", "f", "f"], [4.0, 5.0, 5.0, 6.0, 6.0, 6.0]), diff --git a/test/spark/transformations/test_camel_to_snake_transform.py b/tests/spark/transformations/test_camel_to_snake_transform.py similarity index 97% rename from test/spark/transformations/test_camel_to_snake_transform.py rename to tests/spark/transformations/test_camel_to_snake_transform.py index f46dfdcb..fe31b503 100644 --- a/test/spark/transformations/test_camel_to_snake_transform.py +++ b/tests/spark/transformations/test_camel_to_snake_transform.py @@ -1,9 +1,12 @@ import pytest + from pyspark.sql import functions as F from koheesio.steps.readers.dummy import DummyReader from koheesio.steps.transformations.camel_to_snake import CamelToSnakeTransformation +pytestmark = pytest.mark.spark + class TestCamelToSnakeTransformation: @pytest.fixture diff --git a/test/spark/transformations/test_cast_to_datatype.py b/tests/spark/transformations/test_cast_to_datatype.py similarity index 99% rename from test/spark/transformations/test_cast_to_datatype.py rename to tests/spark/transformations/test_cast_to_datatype.py index 4447db5a..11ee7c20 100644 --- a/test/spark/transformations/test_cast_to_datatype.py +++ b/tests/spark/transformations/test_cast_to_datatype.py @@ -6,7 +6,9 @@ from decimal import Decimal import pytest + from pydantic import ValidationError + from pyspark.sql import DataFrame from pyspark.sql import functions as f @@ -27,6 +29,8 @@ ) from koheesio.utils import SparkDatatype +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "input_values,expected", diff --git a/test/spark/transformations/test_drop_column.py b/tests/spark/transformations/test_drop_column.py similarity index 98% rename from test/spark/transformations/test_drop_column.py rename to tests/spark/transformations/test_drop_column.py index 42ab9f7f..a0427b72 100644 --- a/test/spark/transformations/test_drop_column.py +++ b/tests/spark/transformations/test_drop_column.py @@ -2,6 +2,8 @@ from koheesio.steps.transformations.drop_column import DropColumn +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "input_values,expected", diff --git a/test/spark/transformations/test_get_item.py b/tests/spark/transformations/test_get_item.py similarity index 98% rename from test/spark/transformations/test_get_item.py rename to tests/spark/transformations/test_get_item.py index abd7fc1d..cc3d632c 100644 --- a/test/spark/transformations/test_get_item.py +++ b/tests/spark/transformations/test_get_item.py @@ -2,6 +2,8 @@ from koheesio.steps.transformations.get_item import GetItem +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "input_values,input_data,input_schema,expected", diff --git a/test/spark/transformations/test_hash.py b/tests/spark/transformations/test_hash.py similarity index 99% rename from test/spark/transformations/test_hash.py rename to tests/spark/transformations/test_hash.py index ef4452ae..1c0245c7 100644 --- a/test/spark/transformations/test_hash.py +++ b/tests/spark/transformations/test_hash.py @@ -5,6 +5,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.hash import Sha2Hash +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name="test_hash") diff --git a/test/spark/transformations/test_lookup.py b/tests/spark/transformations/test_lookup.py similarity index 98% rename from test/spark/transformations/test_lookup.py rename to tests/spark/transformations/test_lookup.py index 4398ccd4..890d7c37 100644 --- a/test/spark/transformations/test_lookup.py +++ b/tests/spark/transformations/test_lookup.py @@ -1,4 +1,5 @@ import pytest + from pyspark.sql import SparkSession from koheesio.steps.transformations.lookup import ( @@ -9,6 +10,8 @@ TargetColumn, ) +pytestmark = pytest.mark.spark + def test_join_mapping_column_test() -> None: assert str(JoinMapping(source_column="source", other_column="joined").column) == "Column<'joined AS source'>" diff --git a/test/spark/transformations/test_repartition.py b/tests/spark/transformations/test_repartition.py similarity index 98% rename from test/spark/transformations/test_repartition.py rename to tests/spark/transformations/test_repartition.py index 780fee95..ab557870 100644 --- a/test/spark/transformations/test_repartition.py +++ b/tests/spark/transformations/test_repartition.py @@ -3,6 +3,8 @@ from koheesio.models import ValidationError from koheesio.steps.transformations.repartition import Repartition +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "input_values,expected", diff --git a/test/spark/transformations/test_replace.py b/tests/spark/transformations/test_replace.py similarity index 99% rename from test/spark/transformations/test_replace.py rename to tests/spark/transformations/test_replace.py index 7f54fbf4..e8ad7491 100644 --- a/test/spark/transformations/test_replace.py +++ b/tests/spark/transformations/test_replace.py @@ -3,6 +3,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.replace import Replace +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "input_values,expected", diff --git a/test/spark/transformations/test_row_number_dedup.py b/tests/spark/transformations/test_row_number_dedup.py similarity index 99% rename from test/spark/transformations/test_row_number_dedup.py rename to tests/spark/transformations/test_row_number_dedup.py index b9ad86bc..aafe55c0 100644 --- a/test/spark/transformations/test_row_number_dedup.py +++ b/tests/spark/transformations/test_row_number_dedup.py @@ -1,12 +1,15 @@ from datetime import datetime import pytest + from pyspark.sql import SparkSession from pyspark.sql import functions as F from koheesio.steps.transformations.rank_dedup import RankDedup from koheesio.steps.transformations.row_number_dedup import RowNumberDedup +pytestmark = pytest.mark.spark + @pytest.mark.parametrize("target_column", ["col_row_nuber"]) def test_row_number_dedup(spark: SparkSession, target_column: str) -> None: diff --git a/test/spark/transformations/test_sql_transform.py b/tests/spark/transformations/test_sql_transform.py similarity index 99% rename from test/spark/transformations/test_sql_transform.py rename to tests/spark/transformations/test_sql_transform.py index edcccdce..0b60f512 100644 --- a/test/spark/transformations/test_sql_transform.py +++ b/tests/spark/transformations/test_sql_transform.py @@ -6,6 +6,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.sql_transform import SqlTransform +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name="test_sql_transform") diff --git a/test/spark/transformations/test_transform.py b/tests/spark/transformations/test_transform.py similarity index 98% rename from test/spark/transformations/test_transform.py rename to tests/spark/transformations/test_transform.py index 9233e4e5..d39e09ca 100644 --- a/test/spark/transformations/test_transform.py +++ b/tests/spark/transformations/test_transform.py @@ -1,11 +1,15 @@ from typing import Any, Dict +import pytest + from pyspark.sql import DataFrame from pyspark.sql import functions as f from koheesio.logger import LoggingFactory from koheesio.steps.transformations.transform import Transform +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name="test_transform") diff --git a/test/spark/transformations/test_transformation.py b/tests/spark/transformations/test_transformation.py similarity index 99% rename from test/spark/transformations/test_transformation.py rename to tests/spark/transformations/test_transformation.py index 5659984c..74826764 100644 --- a/test/spark/transformations/test_transformation.py +++ b/tests/spark/transformations/test_transformation.py @@ -1,4 +1,5 @@ import pytest + from pyspark.sql import Column from pyspark.sql import functions as f @@ -9,6 +10,8 @@ from koheesio.steps.transformations.dummy import DummyTransformation from koheesio.utils import SparkDatatype +pytestmark = pytest.mark.spark + def test_transform(dummy_df): tf = DummyTransformation() diff --git a/test/spark/transformations/test_uuid5.py b/tests/spark/transformations/test_uuid5.py similarity index 98% rename from test/spark/transformations/test_uuid5.py rename to tests/spark/transformations/test_uuid5.py index 2154f90b..3d864b75 100644 --- a/test/spark/transformations/test_uuid5.py +++ b/tests/spark/transformations/test_uuid5.py @@ -3,6 +3,8 @@ from koheesio.logger import LoggingFactory from koheesio.steps.transformations.uuid5 import HashUUID5 +pytestmark = pytest.mark.spark + log = LoggingFactory.get_logger(name="test_uuid5") diff --git a/test/spark/writers/delta/test_delta_writer.py b/tests/spark/writers/delta/test_delta_writer.py similarity index 99% rename from test/spark/writers/delta/test_delta_writer.py rename to tests/spark/writers/delta/test_delta_writer.py index c22ca3fc..576e9e05 100644 --- a/test/spark/writers/delta/test_delta_writer.py +++ b/tests/spark/writers/delta/test_delta_writer.py @@ -4,7 +4,9 @@ import pytest from conftest import await_job_completion from delta import DeltaTable + from pydantic import ValidationError + from pyspark.sql import functions as F from koheesio.steps.delta import DeltaTableStep @@ -14,6 +16,8 @@ from koheesio.steps.writers.delta.utils import log_clauses from koheesio.steps.writers.stream import Trigger +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "output_mode,expected_count", diff --git a/test/spark/writers/delta/test_scd.py b/tests/spark/writers/delta/test_scd.py similarity index 99% rename from test/spark/writers/delta/test_scd.py rename to tests/spark/writers/delta/test_scd.py index 72ef9fa7..7a79e45b 100644 --- a/test/spark/writers/delta/test_scd.py +++ b/tests/spark/writers/delta/test_scd.py @@ -1,9 +1,12 @@ import datetime from typing import List, Optional +import pytest from delta import DeltaTable from delta.tables import DeltaMergeBuilder + from pydantic import Field + from pyspark.sql import Column from pyspark.sql import functions as F from pyspark.sql.types import Row @@ -12,6 +15,8 @@ from koheesio.steps.spark import DataFrame, current_timestamp_utc from koheesio.steps.writers.delta.scd import SCD2DeltaTableWriter +pytestmark = pytest.mark.spark + def test_scd2_custom_logic(spark): def _get_result(target_df: DataFrame, expr: str): diff --git a/test/spark/writers/test_buffer.py b/tests/spark/writers/test_buffer.py similarity index 99% rename from test/spark/writers/test_buffer.py rename to tests/spark/writers/test_buffer.py index 51e87f3a..62a25167 100644 --- a/test/spark/writers/test_buffer.py +++ b/tests/spark/writers/test_buffer.py @@ -3,6 +3,7 @@ from importlib.util import find_spec import pytest + from pyspark.sql.types import ( BooleanType, FloatType, @@ -14,6 +15,8 @@ from koheesio.steps.writers.buffer import PandasCsvBufferWriter, PandasJsonBufferWriter +pytestmark = pytest.mark.spark + # Test data has two columns: email and sha256_email test_data = [ ( diff --git a/test/spark/writers/test_dummy.py b/tests/spark/writers/test_dummy.py similarity index 94% rename from test/spark/writers/test_dummy.py rename to tests/spark/writers/test_dummy.py index 831b6372..69f9ea63 100644 --- a/test/spark/writers/test_dummy.py +++ b/tests/spark/writers/test_dummy.py @@ -2,6 +2,8 @@ from koheesio.steps.writers.dummy import DummyWriter +pytestmark = pytest.mark.spark + expected = {"id": 0} diff --git a/test/spark/writers/test_sftp.py b/tests/spark/writers/test_sftp.py similarity index 99% rename from test/spark/writers/test_sftp.py rename to tests/spark/writers/test_sftp.py index 685236fc..c0e454e7 100644 --- a/test/spark/writers/test_sftp.py +++ b/tests/spark/writers/test_sftp.py @@ -12,6 +12,8 @@ SFTPWriter, ) +pytestmark = pytest.mark.spark + @pytest.fixture def buffer_writer(spark): diff --git a/test/spark/writers/test_stream.py b/tests/spark/writers/test_stream.py similarity index 98% rename from test/spark/writers/test_stream.py rename to tests/spark/writers/test_stream.py index 419f3dae..dc125b7c 100644 --- a/test/spark/writers/test_stream.py +++ b/tests/spark/writers/test_stream.py @@ -4,6 +4,8 @@ from koheesio.steps.writers.stream import Trigger +pytestmark = pytest.mark.spark + @pytest.mark.parametrize( "args, expected", diff --git a/test/steps/asyncio/test_asyncio_http.py b/tests/steps/asyncio/test_asyncio_http.py similarity index 99% rename from test/steps/asyncio/test_asyncio_http.py rename to tests/steps/asyncio/test_asyncio_http.py index 09ff0f78..acf4a76f 100644 --- a/test/steps/asyncio/test_asyncio_http.py +++ b/tests/steps/asyncio/test_asyncio_http.py @@ -3,9 +3,10 @@ import pytest from aiohttp import ClientResponseError, ClientSession, TCPConnector from aiohttp_retry import ExponentialRetry -from pydantic import ValidationError from yarl import URL +from pydantic import ValidationError + from koheesio.steps.asyncio.http import AsyncHttpStep from koheesio.steps.http import HttpMethod diff --git a/test/steps/integrations/notifications/test_slack.py b/tests/steps/integrations/notifications/test_slack.py similarity index 100% rename from test/steps/integrations/notifications/test_slack.py rename to tests/steps/integrations/notifications/test_slack.py diff --git a/test/steps/integrations/sso/test_okta.py b/tests/steps/integrations/sso/test_okta.py similarity index 97% rename from test/steps/integrations/sso/test_okta.py rename to tests/steps/integrations/sso/test_okta.py index 7b31373f..09a5a608 100644 --- a/test/steps/integrations/sso/test_okta.py +++ b/tests/steps/integrations/sso/test_okta.py @@ -2,9 +2,10 @@ from io import StringIO import pytest -from pydantic import SecretStr from requests_mock.mocker import Mocker +from pydantic import SecretStr + from koheesio.steps.integrations.sso import okta as o @@ -45,7 +46,7 @@ def test_log_extra_params_secret(self): log_capture_string = StringIO() ch = logging.StreamHandler(log_capture_string) ch.setLevel(logging.DEBUG) - logger = logging.getLogger("test") + logger = logging.getLogger("tests") logger.addHandler(ch) secret_val = "secret_value" oat = o.OktaAccessToken( diff --git a/test/steps/test_http.py b/tests/steps/test_http.py similarity index 100% rename from test/steps/test_http.py rename to tests/steps/test_http.py diff --git a/test/utils/test_utils.py b/tests/utils/test_utils.py similarity index 99% rename from test/utils/test_utils.py rename to tests/utils/test_utils.py index 22919506..9b2acbaa 100644 --- a/test/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -2,6 +2,7 @@ from unittest.mock import patch import pytest + from pyspark.sql.types import StringType, StructField, StructType from koheesio.utils import (