+-----
+
+A framework that can be used to build advanced data-pipelines by simple Python implementations
## Introduction
Koheesio is aimed at those building data processing pipelines in Python, particularly useful for large-scale data.
-Koheesio, named after the Finnish word for cohesion, is a Python framework for building type-safe and efficient Data
-Pipelines. It promotes modularity, composability, and collaboration, allowing for the creation of complex pipelines
+Koheesio, named after the Finnish word for cohesion, is a Python framework for building type-safe and efficient Data
+Pipelines. It promotes modularity, composability, and collaboration, allowing for the creation of complex pipelines
from simple, reusable components.
-Koheesio's goal is to ensure predictable pipeline execution through a solid foundation of well-tested code and a rich
-set of features, making it an excellent choice for developers and organizations seeking to build robust and adaptable
+Koheesio's goal is to ensure predictable pipeline execution through a solid foundation of well-tested code and a rich
+set of features, making it an excellent choice for developers and organizations seeking to build robust and adaptable
Data Pipelines.
- It is designed to be modular and composable, allowing for the creation of complex pipelines from simple components.
@@ -100,17 +83,17 @@ Additionally, Koheesio also uses [Pydantic] for strong typing, data validation,
Here are the key components included in Koheesio:
-* __Step__: This is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline,
+- __Step__: This is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline,
taking in inputs and producing outputs.
-* __Task__: This is a larger unit of work, typically encompassing an entire Extract - Transform - Load process for a
+- __Task__: This is a larger unit of work, typically encompassing an entire Extract - Transform - Load process for a
data object.
-* __Context__: This is a configuration class used to set up the environment for a Task. It can be used to share
+- __Context__: This is a configuration class used to set up the environment for a Task. It can be used to share
variables across tasks and adapt the behavior of a Task based on its environment.
-* __Logger__: This is a class for logging messages at different levels.
-* __Reader__: This is a type of Step that reads data from a source and stores the result (to make it available for
+- __Logger__: This is a class for logging messages at different levels.
+- __Reader__: This is a type of Step that reads data from a source and stores the result (to make it available for
subsequent steps)
-* __Writer__: This controls how data is written to the output in both batch and streaming contexts.
-* __Transformation__: This is a type of Step that takes a DataFrame as input and returns a DataFrame as output.
+- __Writer__: This controls how data is written to the output in both batch and streaming contexts.
+- __Transformation__: This is a type of Step that takes a DataFrame as input and returns a DataFrame as output.
```text
┌─────────┐ ┌──────────────────┐ ┌─────────┐
@@ -131,66 +114,70 @@ The interactions between the base concepts of the model is visible in the below
Below sections provide some more details on every Core Component
### Step
-The Step is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in
+
+The Step is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in
inputs and producing outputs.
-For example, a Step could be a process that takes in raw data, cleans it, and outputs the cleaned data. Each Step is
+For example, a Step could be a process that takes in raw data, cleans it, and outputs the cleaned data. Each Step is
designed to be self-contained and reusable, meaning it can be used in multiple tasks or pipelines without modification.
### Task
-A Task in Koheesio is a larger unit of work, typically encompassing an entire Extract - Transform - Load (ETL) process
+
+A Task in Koheesio is a larger unit of work, typically encompassing an entire Extract - Transform - Load (ETL) process
for a data object.
-For example, a Task could involve extracting data from a database, transforming the data using several Steps, and then
+For example, a Task could involve extracting data from a database, transforming the data using several Steps, and then
loading the transformed data into a data warehouse. Tasks are designed to be modular and composable, allowing developers
to build complex data pipelines by combining multiple Tasks.
Task is a subclass of Step.
### Context
-The Context in Koheesio is a configuration class/interface used to set up the environment for a Task. It can be used to
+
+The Context in Koheesio is a configuration class/interface used to set up the environment for a Task. It can be used to
share variables across tasks and adapt the behavior of a Task based on its environment.
-For example, a Context could be used to specify the database connection details for a Task, or to specify the output
-format for a Writer. The Context is designed to be flexible and adaptable, allowing developers to easily configure their
-tasks to suit different environments or requirements. Note that the use of Context is not bound to Tasks; Task is used
+For example, a Context could be used to specify the database connection details for a Task, or to specify the output
+format for a Writer. The Context is designed to be flexible and adaptable, allowing developers to easily configure their
+tasks to suit different environments or requirements. Note that the use of Context is not bound to Tasks; Task is used
illustratively in this description.
-Context can be created from yaml files, dictionaries, or json files. It also possible to merge multiple Contexts
+Context can be created from yaml files, dictionaries, or json files. It also possible to merge multiple Contexts
together. Koheesio also has the notion of a 'SecretContext', which can be used to store sensitive information such as
passwords or API keys without exposing them in the code.
### Logger
-The Logger in Koheesio is a class/interface for logging messages at different levels. It can be used to log debug
+
+The Logger in Koheesio is a class/interface for logging messages at different levels. It can be used to log debug
messages, informational messages, warnings, errors, and critical errors.
-For example, a Logger could be used to log the progress of a Task, or to log any errors that occur during the execution
-of a Task. The Logger is designed to be easy to use and flexible, allowing developers to easily monitor the progress and
+For example, a Logger could be used to log the progress of a Task, or to log any errors that occur during the execution
+of a Task. The Logger is designed to be easy to use and flexible, allowing developers to easily monitor the progress and
status of their tasks.
The API of logger is designed to mimic that of those found in Scala.
Click [here](docs/concepts/logging.md) to see logger examples.
-
### Reader
-The Reader in Koheesio is a type of Step that reads data from a source and stores the result. For example, a Reader
-could be used to read data from a CSV file, a database, or an API. The Reader is designed to be flexible and adaptable,
+
+The Reader in Koheesio is a type of Step that reads data from a source and stores the result. For example, a Reader
+could be used to read data from a CSV file, a database, or an API. The Reader is designed to be flexible and adaptable,
allowing developers to easily read data from a wide range of sources.
Reader is a subclass of Step
-
### Writer
-The Writer in Koheesio controls how data is written to the output in both batch and streaming contexts. For example, a
-Writer could be used to write data to a CSV file, a database, or an API. The Writer is designed to be flexible and
+
+The Writer in Koheesio controls how data is written to the output in both batch and streaming contexts. For example, a
+Writer could be used to write data to a CSV file, a database, or an API. The Writer is designed to be flexible and
adaptable, allowing developers to easily write data to a wide range of destinations.
Writer is a subclass of Step.
-
### Transformation
-The Transformation in Koheesio is a type of Step that takes a DataFrame as input and returns a DataFrame as output. For
-example, a Transformation could be used to clean data, aggregate data, or perform complex calculations on data. The
-Transformation is designed to be flexible and powerful, allowing developers to easily perform a wide range of data
+
+The Transformation in Koheesio is a type of Step that takes a DataFrame as input and returns a DataFrame as output. For
+example, a Transformation could be used to clean data, aggregate data, or perform complex calculations on data. The
+Transformation is designed to be flexible and powerful, allowing developers to easily perform a wide range of data
transformations.
Transformation is a subclass of Step.
@@ -199,7 +186,6 @@ Transformation is a subclass of Step.
You can install Koheesio using either pip or poetry.
-
### Using Pip
To install Koheesio using pip, run the following command in your terminal:
@@ -208,14 +194,15 @@ To install Koheesio using pip, run the following command in your terminal:
pip install koheesio
```
-
### Using Poetry
If you're using poetry for package management, you can add Koheesio to your project with the following command:
+
```bash
poetry add koheesio
```
-or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace
+
+or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace
`...` with the version you want to have installed:
```toml
@@ -223,30 +210,27 @@ koheesio = {version = "..."}
```
### Extras
+
Koheesio also provides some additional features that can be useful in certain scenarios. These include:
-- **Spark Expectations:** Available through the `koheesio.steps.integration.dq` module; installable through the `se` extra.
- - SE Provides Data Quality checks for Spark DataFrames.
- - For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations).
+- __Spark Expectations:__ Available through the `koheesio.steps.integration.dq` module; installable through the `se` extra.
+ - SE Provides Data Quality checks for Spark DataFrames.
+ - For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations).
[//]: # (- **Brickflow:** Available through the `koheesio.steps.integration.workflow` module; installable through the `bf` extra.)
[//]: # ( - Brickflow is a workflow orchestration tool that allows you to define and execute workflows in a declarative way.)
[//]: # ( - For more information, refer to the [Brickflow docs](https://engineering.nike.com/brickflow))
-- **Box**: Available through the `koheesio.steps.integration.box` module; installable through the `box` extra.
- - Box is a cloud content management and file sharing service for businesses.
+- __Box__: Available through the `koheesio.steps.integration.box` module; installable through the `box` extra.
+ - Box is a cloud content management and file sharing service for businesses.
-- **Cerberus**: Available through the `koheesio.steps.integration.secrets` module; installable through the `cerberus` extra.
- - Cerberus is a tool for managing secrets in a secure and scalable way.
- - For more information, refer to the [Cerberus docs](https://engineering.nike.com/cerberus)
-
-> **Note:**
+> __Note:__
> Some of the steps require extra dependencies. See the [Extras](#extras) section for additional info.
> Extras can be added to Poetry by adding `extras=['name_of_the_extra']` to the toml entry mentioned above
## Getting Started
-In this section, we will provide an example of how you can use Koheesio. This example uses all the Core Concepts
+In this section, we will provide an example of how you can use Koheesio. This example uses all the Core Concepts
(except an explicit logger) to show how it can be combined end-to-end.
Consider this code snippet:
@@ -336,7 +320,7 @@ Based on the `DummyReader()`, we start with an input DataFrame that looks like t
The transform method we specified in our custom Task adds a new column called "MyFavoriteMovie" to the input DataFrame, filled with the value of `my_favorite_movie` that we provided while initializing the class.
-Given that the `CamelToSnakeTransformation` changes the column names into snake case, the output DataFrame from `task.run()` will have two columns: 'id' and 'my_favorite_movie'. The 'id' column will contain 100 rows of data (as provided by DummyReader), and the 'my_favorite_movie' column will contain the same value "inception" for all rows.
+Given that the `CamelToSnakeTransformation` changes the column names into snake case, the output DataFrame from `task.run()` will have two columns: 'id' and 'my_favorite_movie'. The 'id' column will contain 100 rows of data (as provided by DummyReader), and the 'my_favorite_movie' column will contain the same value "inception" for all rows.
The DummyWriter then performs a df.show() on the DataFrame and returns the first row of data as a dict. So, the final output of task.run() would be a dictionary representing the first row of the DataFrame, something like this:
@@ -355,53 +339,58 @@ The data in this DataFrame looks like this:
| 100 | inception |
## Custom Implementations
-In Koheesio, custom implementations can replace the provided implementations as long as they match the API. Here are
+
+In Koheesio, custom implementations can replace the provided implementations as long as they match the API. Here are
some examples:
-1. __Custom Step__: You can create a custom Step that performs a specific operation not covered by the built-in Steps.
+1. __Custom Step__: You can create a custom Step that performs a specific operation not covered by the built-in Steps.
For example, you might create a Step that performs a specific type of data validation or a complex transformation.
+
```python
class CustomStep(Step):
def execute(self):
# Implement your custom logic here
```
-2. __Custom Reader__: You can create a custom Reader that reads data from a specific source not covered by the built-in
+2. __Custom Reader__: You can create a custom Reader that reads data from a specific source not covered by the built-in
Readers. For example, you might create a Reader that reads data from a proprietary database or a specific API.
+
```python
class CustomReader(Reader):
def execute(self):
# Implement your custom logic here
```
-3. __Custom Writer__: You can create a custom Writer that writes data to a specific destination not covered by the
- built-in Writers. For example, you might create a Writer that writes data to a proprietary database or a specific
+3. __Custom Writer__: You can create a custom Writer that writes data to a specific destination not covered by the
+ built-in Writers. For example, you might create a Writer that writes data to a proprietary database or a specific
API.
+
```python
class CustomWriter(Writer):
def execute(self):
# Implement your custom logic here
```
-4. __Custom Transformation__: You can create a custom Transformation that performs a specific transformation not
- covered by the built-in Transformations. For example, you might create a Transformation that performs a complex
+4. __Custom Transformation__: You can create a custom Transformation that performs a specific transformation not
+ covered by the built-in Transformations. For example, you might create a Transformation that performs a complex
calculation or a specific type of data cleaning.
+
```python
class CustomTransformation(Transformation):
def execute(self):
# Implement your custom logic here
```
-Note that in all the above examples, the `execute` method is the only method that needs to be implemented. This is
+Note that in all the above examples, the `execute` method is the only method that needs to be implemented. This is
per design.
-Remember, the key to creating custom implementations in Koheesio is to ensure they match the API of the component
-they're replacing. This ensures they can be used interchangeably with the built-in components. Check out the API
+Remember, the key to creating custom implementations in Koheesio is to ensure they match the API of the component
+they're replacing. This ensures they can be used interchangeably with the built-in components. Check out the API
Documentation for more details on the many different components available in Koheesio.
-Before creating custom implementations, it's a good idea to check if there's already a built-in component that can be
-used instead. This will save you time and effort in the long run. When you do create custom implementations, read
-through the API documentation belonging to the Step and StepOutput modules.
+Before creating custom implementations, it's a good idea to check if there's already a built-in component that can be
+used instead. This will save you time and effort in the long run. When you do create custom implementations, read
+through the API documentation belonging to the Step and StepOutput modules.
## Contributing
@@ -409,21 +398,21 @@ through the API documentation belonging to the Step and StepOutput modules.
We welcome contributions to our project! Here's a brief overview of our development process:
-* __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes
- these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull
+- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes
+ these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull
request.
-* __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting
+- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting
a pull request.
-* __Release Process__: We aim for frequent releases. Typically when we have a new feature or bugfix, a developer with
+- __Release Process__: We aim for frequent releases. Typically when we have a new feature or bugfix, a developer with
admin rights will create a new release on GitHub and publish the new version to PyPI.
-For more detailed information, please refer to our [contribution guidelines](./docs/contribute.md). We also adhere to
+For more detailed information, please refer to our [contribution guidelines](./docs/contribute.md). We also adhere to
[Nike's Code of Conduct](https://github.com/Nike-Inc/nike-inc.github.io/blob/master/CONDUCT.md) and [Nike's Individual Contributor License Agreement](https://www.clahub.com/agreements/Nike-Inc/fastbreak).
### Additional Resources
-* [General GitHub documentation](https://help.github.com/)
-* [GitHub pull request documentation](https://help.github.com/send-pull-requests/)
-* [Nike OSS](https://nike-inc.github.io/)
+- [General GitHub documentation](https://help.github.com/)
+- [GitHub pull request documentation](https://help.github.com/send-pull-requests/)
+- [Nike OSS](https://nike-inc.github.io/)
diff --git a/koheesio/__about__.py b/koheesio/__about__.py
index 586ac232..85772490 100644
--- a/koheesio/__about__.py
+++ b/koheesio/__about__.py
@@ -12,13 +12,16 @@
LICENSE_INFO = "Licensed as Apache 2.0"
SOURCE = "https://github.com/Nike-Inc/koheesio"
-__version__ = "0.7.0.dev0"
-__logo__ = 75, (
- b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15"
- b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee"
- b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc"
- b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e"
- b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00"
+__version__ = "0.7.0.dev1"
+__logo__ = (
+ 75,
+ (
+ b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15"
+ b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee"
+ b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc"
+ b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e"
+ b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00"
+ ),
)
__short_description__ = __doc__.split("\n", maxsplit=1)[0]
__about__ = f"""Koheesio -v{__version__}
diff --git a/koheesio/__init__.py b/koheesio/__init__.py
index ac44970f..e25d424e 100644
--- a/koheesio/__init__.py
+++ b/koheesio/__init__.py
@@ -1,8 +1,13 @@
# pragma: no cover
+
+import os
+
from koheesio.__about__ import __version__, _about
from koheesio.models import BaseModel, ExtraParamsMixin
from koheesio.steps import Step, StepOutput
+from koheesio.utils import convert_str_to_bool
+_koheesio_print_logo = convert_str_to_bool(os.environ.get("KOHEESIO__PRINT_LOGO", "True"))
_logo_printed = False
ABOUT = _about()
VERSION = __version__
@@ -12,7 +17,9 @@
def print_logo():
global _logo_printed
- if not _logo_printed:
+ global _koheesio_print_logo
+
+ if not _logo_printed and _koheesio_print_logo:
print(ABOUT)
_logo_printed = True
diff --git a/koheesio/context.py b/koheesio/context.py
index efc01e03..0f4b69e3 100644
--- a/koheesio/context.py
+++ b/koheesio/context.py
@@ -14,9 +14,9 @@
from __future__ import annotations
import re
+from typing import Any, Dict, Union
from collections.abc import Mapping
from pathlib import Path
-from typing import Any, Dict, Union
import jsonpickle
import tomli
diff --git a/koheesio/logger.py b/koheesio/logger.py
index 8035db60..0ddfe03c 100644
--- a/koheesio/logger.py
+++ b/koheesio/logger.py
@@ -33,8 +33,8 @@
import logging
import os
import sys
-from logging import Formatter, Logger, getLogger
from typing import Any, Dict, Generator, Generic, List, Optional, Tuple, TypeVar
+from logging import Formatter, Logger, getLogger
from uuid import uuid4
from warnings import warn
diff --git a/koheesio/models/__init__.py b/koheesio/models/__init__.py
index 525286f0..b405f65a 100644
--- a/koheesio/models/__init__.py
+++ b/koheesio/models/__init__.py
@@ -1,4 +1,4 @@
-""" Models package creates models that can be used to base other classes on.
+"""Models package creates models that can be used to base other classes on.
- Every model should be at least a pydantic BaseModel, but can also be a Step, or a StepOutput.
- Every model is expected to be an ABC (Abstract Base Class)
@@ -9,26 +9,21 @@
Transformation and Reader classes.
"""
-# pylint: disable=redefined-builtin
+from typing import Annotated, Any, Dict, List, Optional, Union
from abc import ABC
from functools import cached_property
from pathlib import Path
-from typing import Annotated, Any, Dict, List, Optional, Union
# to ensure that koheesio.models is a drop in replacement for pydantic
-from pydantic import * # noqa
from pydantic import BaseModel as PydanticBaseModel
-
-# noinspection PyUnresolvedReferences, PyProtectedMember
+from pydantic import * # noqa
from pydantic._internal._generics import PydanticGenericMetadata
-
-# noinspection PyUnresolvedReferences, PyProtectedMember
from pydantic._internal._model_construction import ModelMetaclass
from koheesio.context import Context
from koheesio.logger import Logger, LoggingFactory
-# pylint: enable=redefined-builtin
+__all__ = ["BaseModel", "ExtraParamsMixin", "ListOfColumns", "ModelMetaclass", "PydanticGenericMetadata"]
# pylint: disable=function-redefined
@@ -126,10 +121,12 @@ class BaseModel(PydanticBaseModel, ABC):
```python
from koheesio.models import BaseModel
+
class Person(BaseModel):
name: str
age: int
+
# Using the lazy method to create an instance without immediate validation
person = Person.lazy()
@@ -257,6 +254,7 @@ def from_context(cls, context: Context) -> BaseModel:
class SomeStep(BaseModel):
foo: str
+
context = Context(foo="bar")
some_step = SomeStep.from_context(context)
print(some_step.foo) # prints 'bar'
@@ -605,6 +603,7 @@ class FooModel(BaseModel):
foo: str
lorem: str
+
foo_model = FooModel.lazy()
foo_model.foo = "bar"
foo_model.lorem = "ipsum"
diff --git a/koheesio/models/sql.py b/koheesio/models/sql.py
index 37207a76..eacb100e 100644
--- a/koheesio/models/sql.py
+++ b/koheesio/models/sql.py
@@ -1,8 +1,8 @@
"""This module contains the base class for SQL steps."""
+from typing import Any, Dict, Optional, Union
from abc import ABC
from pathlib import Path
-from typing import Any, Dict, Optional, Union
from koheesio.models import ExtraParamsMixin, Field, model_validator
from koheesio.steps import Step
diff --git a/koheesio/steps/asyncio/__init__.py b/koheesio/steps/asyncio/__init__.py
index b501c1de..4efdc38c 100644
--- a/koheesio/steps/asyncio/__init__.py
+++ b/koheesio/steps/asyncio/__init__.py
@@ -2,8 +2,8 @@
This module provides classes for asynchronous steps in the koheesio package.
"""
-from asyncio import iscoroutine
from typing import Dict, Union
+from asyncio import iscoroutine
from koheesio.steps.step import Step, StepMetaClass, StepOutput
@@ -64,9 +64,7 @@ def merge(self, other: Union[Dict, StepOutput]):
For example:
```python
step_output = StepOutput(foo="bar")
- step_output.merge(
- {"lorem": "ipsum"}
- ) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
+ step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
```
Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`.
diff --git a/koheesio/steps/asyncio/http.py b/koheesio/steps/asyncio/http.py
index 015877fc..640e1cd7 100644
--- a/koheesio/steps/asyncio/http.py
+++ b/koheesio/steps/asyncio/http.py
@@ -12,6 +12,7 @@
import yarl
from aiohttp import BaseConnector, ClientSession, TCPConnector
from aiohttp_retry import ExponentialRetry, RetryClient, RetryOptionsBase
+
from pydantic import Field, SecretStr, field_validator, model_validator
from koheesio.models import ExtraParamsMixin
diff --git a/koheesio/steps/delta.py b/koheesio/steps/delta.py
index 528739ff..fb6edf04 100644
--- a/koheesio/steps/delta.py
+++ b/koheesio/steps/delta.py
@@ -6,6 +6,7 @@
from typing import Dict, List, Optional, Union
from py4j.protocol import Py4JJavaError # type: ignore
+
from pyspark.sql import DataFrame
from pyspark.sql.types import DataType
diff --git a/koheesio/steps/http.py b/koheesio/steps/http.py
index 16d4bf6e..3ffe8c48 100644
--- a/koheesio/steps/http.py
+++ b/koheesio/steps/http.py
@@ -13,8 +13,8 @@
"""
import json
-from enum import Enum
from typing import Any, Dict, List, Optional, Union
+from enum import Enum
import requests
diff --git a/koheesio/steps/integrations/box.py b/koheesio/steps/integrations/box.py
index bb52a6ec..fa29c482 100644
--- a/koheesio/steps/integrations/box.py
+++ b/koheesio/steps/integrations/box.py
@@ -11,15 +11,16 @@
"""
import re
+from typing import Any, Dict, Optional, Union
from abc import ABC, abstractmethod
from datetime import datetime
from io import BytesIO
from pathlib import PurePath
-from typing import Any, Dict, Optional, Union
from boxsdk import Client, JWTAuth
from boxsdk.object.file import File
from boxsdk.object.folder import Folder
+
from pyspark.sql import DataFrame
from pyspark.sql.functions import expr, lit
from pyspark.sql.types import StructType
@@ -609,16 +610,12 @@ class BoxFileWriter(BoxFolderBase):
from koheesio.steps.integrations.box import BoxFileWriter
auth_params = {...}
- f1 = BoxFileWriter(
- **auth_params, path="/foo/bar", file="path/to/my/file.ext"
- ).execute()
+ f1 = BoxFileWriter(**auth_params, path="/foo/bar", file="path/to/my/file.ext").execute()
# or
import io
b = io.BytesIO(b"my-sample-data")
- f2 = BoxFileWriter(
- **auth_params, path="/foo/bar", file=b, name="file.ext"
- ).execute()
+ f2 = BoxFileWriter(**auth_params, path="/foo/bar", file=b, name="file.ext").execute()
```
"""
diff --git a/koheesio/steps/integrations/dq/spark_expectations.py b/koheesio/steps/integrations/dq/spark_expectations.py
index 8724db43..687ccdc2 100644
--- a/koheesio/steps/integrations/dq/spark_expectations.py
+++ b/koheesio/steps/integrations/dq/spark_expectations.py
@@ -4,14 +4,16 @@
from typing import Any, Dict, Optional, Union
-from pydantic import Field
-from pyspark.sql import DataFrame
from spark_expectations.config.user_config import Constants as user_config
from spark_expectations.core.expectations import (
SparkExpectations,
WrappedDataFrameWriter,
)
+from pydantic import Field
+
+from pyspark.sql import DataFrame
+
from koheesio.steps.transformations import Transformation
from koheesio.steps.writers import BatchOutputMode
diff --git a/koheesio/steps/integrations/notifications/slack.py b/koheesio/steps/integrations/notifications/slack.py
index 8d1b9278..e0689cb7 100644
--- a/koheesio/steps/integrations/notifications/slack.py
+++ b/koheesio/steps/integrations/notifications/slack.py
@@ -3,9 +3,9 @@
"""
import json
+from typing import Any, Dict, Optional
from datetime import datetime
from textwrap import dedent
-from typing import Any, Dict, Optional
from koheesio.models import ConfigDict, Field
from koheesio.steps.http import HttpPostStep
diff --git a/koheesio/steps/integrations/secrets/__init__.py b/koheesio/steps/integrations/secrets/__init__.py
index 38047f42..0740aeaa 100644
--- a/koheesio/steps/integrations/secrets/__init__.py
+++ b/koheesio/steps/integrations/secrets/__init__.py
@@ -3,8 +3,8 @@
Contains abstract class for various secret integrations also known as SecretContext.
"""
-from abc import ABC, abstractmethod
from typing import Optional
+from abc import ABC, abstractmethod
from koheesio.context import Context
from koheesio.models import Field, SecretStr
diff --git a/koheesio/steps/integrations/snowflake.py b/koheesio/steps/integrations/snowflake.py
index f143d971..88ac8ce4 100644
--- a/koheesio/steps/integrations/snowflake.py
+++ b/koheesio/steps/integrations/snowflake.py
@@ -41,10 +41,10 @@
"""
import json
+from typing import Any, Dict, List, Optional, Set, Union
from abc import ABC
from copy import deepcopy
from textwrap import dedent
-from typing import Any, Dict, List, Optional, Set, Union
from pyspark.sql import DataFrame, Window
from pyspark.sql import functions as f
diff --git a/koheesio/steps/integrations/sso/okta.py b/koheesio/steps/integrations/sso/okta.py
index 5c06246c..4a0e840e 100644
--- a/koheesio/steps/integrations/sso/okta.py
+++ b/koheesio/steps/integrations/sso/okta.py
@@ -4,8 +4,8 @@
from __future__ import annotations
-from logging import Filter
from typing import Dict, Optional, Union
+from logging import Filter
from requests import HTTPError
diff --git a/koheesio/steps/readers/__init__.py b/koheesio/steps/readers/__init__.py
index 905912b9..a2ddd816 100644
--- a/koheesio/steps/readers/__init__.py
+++ b/koheesio/steps/readers/__init__.py
@@ -6,8 +6,8 @@
[reference/concepts/steps/readers](../../../reference/concepts/readers.md) section of the Koheesio documentation.
"""
-from abc import ABC, abstractmethod
from typing import Optional
+from abc import ABC, abstractmethod
from pyspark.sql import DataFrame
diff --git a/koheesio/steps/readers/databricks/autoloader.py b/koheesio/steps/readers/databricks/autoloader.py
index 9a63d728..9922c8ae 100644
--- a/koheesio/steps/readers/databricks/autoloader.py
+++ b/koheesio/steps/readers/databricks/autoloader.py
@@ -3,8 +3,8 @@
Autoloader can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.
"""
-from enum import Enum
from typing import Dict, Optional, Union
+from enum import Enum
from koheesio.models import Field, field_validator
from koheesio.steps.readers import Reader
diff --git a/koheesio/steps/readers/file_loader.py b/koheesio/steps/readers/file_loader.py
index f3bc51c2..1ed0f7f7 100644
--- a/koheesio/steps/readers/file_loader.py
+++ b/koheesio/steps/readers/file_loader.py
@@ -30,9 +30,9 @@
[official documentation](https://spark.apache.org/docs/latest/sql-data-sources.html).
"""
+from typing import Optional, Union
from enum import Enum
from pathlib import Path
-from typing import Optional, Union
from pyspark.sql.types import StructType
@@ -80,9 +80,7 @@ class FileLoader(Reader, ExtraParamsMixin):
Example:
```python
- reader = FileLoader(
- path="path/to/textfile.txt", format="text", header=True, lineSep="\n"
- )
+ reader = FileLoader(path="path/to/textfile.txt", format="text", header=True, lineSep="\n")
```
For more information about the available options, see Spark's
diff --git a/koheesio/steps/readers/memory.py b/koheesio/steps/readers/memory.py
index f667ee85..ed7e5e5e 100644
--- a/koheesio/steps/readers/memory.py
+++ b/koheesio/steps/readers/memory.py
@@ -3,9 +3,9 @@
"""
import json
+from typing import Any, Dict, Optional, Union
from enum import Enum
from functools import partial
-from typing import Any, Dict, Optional, Union
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
diff --git a/koheesio/steps/readers/rest_api.py b/koheesio/steps/readers/rest_api.py
index 2d0286f0..a71c1907 100644
--- a/koheesio/steps/readers/rest_api.py
+++ b/koheesio/steps/readers/rest_api.py
@@ -12,6 +12,7 @@
from typing import List, Tuple, Union
from pydantic import Field, InstanceOf
+
from pyspark.sql.types import AtomicType, StructType
from koheesio.steps.asyncio.http import AsyncHttpGetStep
@@ -66,9 +67,7 @@ class RestApiReader(Reader):
pages=3,
session=session,
)
- task = RestApiReader(
- transport=transport, spark_schema="id: int, page:int, value: string"
- )
+ task = RestApiReader(transport=transport, spark_schema="id: int, page:int, value: string")
task.execute()
all_data = [row.asDict() for row in task.output.df.collect()]
```
@@ -93,9 +92,7 @@ class RestApiReader(Reader):
connector=connector,
)
- task = RestApiReader(
- transport=transport, spark_schema="id: int, page:int, value: string"
- )
+ task = RestApiReader(transport=transport, spark_schema="id: int, page:int, value: string")
task.execute()
all_data = [row.asDict() for row in task.output.df.collect()]
```
diff --git a/koheesio/steps/spark.py b/koheesio/steps/spark.py
index a21dd27c..daa8e334 100644
--- a/koheesio/steps/spark.py
+++ b/koheesio/steps/spark.py
@@ -4,10 +4,11 @@
from __future__ import annotations
-from abc import ABC
from typing import Optional
+from abc import ABC
from pydantic import Field
+
from pyspark.sql import Column
from pyspark.sql import DataFrame as PySparkSQLDataFrame
from pyspark.sql import SparkSession as OriginalSparkSession
diff --git a/koheesio/steps/step.py b/koheesio/steps/step.py
index 062ea4f3..ee1e8e15 100644
--- a/koheesio/steps/step.py
+++ b/koheesio/steps/step.py
@@ -8,9 +8,9 @@
import json
import sys
import warnings
+from typing import Any
from abc import abstractmethod
from functools import partialmethod, wraps
-from typing import Any
import yaml
@@ -51,6 +51,14 @@ class StepMetaClass(ModelMetaclass):
allowing for the execute method to be auto-decorated with do_execute
"""
+ # Solution to overcome issue with python>=3.11,
+ # When partialmethod is forgetting that _execute_wrapper
+ # is a method of wrapper, and it needs to pass that in as the first arg.
+ # https://github.com/python/cpython/issues/99152
+ class _partialmethod_with_self(partialmethod):
+ def __get__(self, obj, cls=None):
+ return self._make_unbound_method().__get__(obj, cls)
+
# Unique object to mark a function as wrapped
_step_execute_wrapper_sentinel = object()
diff --git a/koheesio/steps/transformations/__init__.py b/koheesio/steps/transformations/__init__.py
index d99455d1..44e6635e 100644
--- a/koheesio/steps/transformations/__init__.py
+++ b/koheesio/steps/transformations/__init__.py
@@ -21,8 +21,8 @@
Extended ColumnsTransformation class with an additional `target_column` field
"""
-from abc import ABC, abstractmethod
from typing import List, Optional, Union
+from abc import ABC, abstractmethod
from pyspark.sql import Column
from pyspark.sql import functions as f
@@ -58,9 +58,7 @@ class Transformation(SparkStep, ABC):
class AddOne(Transformation):
def execute(self):
- self.output.df = self.df.withColumn(
- "new_column", f.col("old_column") + 1
- )
+ self.output.df = self.df.withColumn("new_column", f.col("old_column") + 1)
```
In the example above, the `execute` method is implemented to add 1 to the values of the `old_column` and store the
diff --git a/koheesio/steps/transformations/arrays.py b/koheesio/steps/transformations/arrays.py
index d97ef73b..bdc8124f 100644
--- a/koheesio/steps/transformations/arrays.py
+++ b/koheesio/steps/transformations/arrays.py
@@ -23,9 +23,9 @@
Base class for all transformations that operate on columns and have a target column.
"""
+from typing import Any
from abc import ABC
from functools import reduce
-from typing import Any
from pyspark.sql import Column
from pyspark.sql import functions as F
diff --git a/koheesio/steps/transformations/camel_to_snake.py b/koheesio/steps/transformations/camel_to_snake.py
index 1cc9413d..70aebbca 100644
--- a/koheesio/steps/transformations/camel_to_snake.py
+++ b/koheesio/steps/transformations/camel_to_snake.py
@@ -48,9 +48,7 @@ class CamelToSnakeTransformation(ColumnsTransformation):
| ... | ... |
```python
- output_df = CamelToSnakeTransformation(column="camelCaseColumn").transform(
- input_df
- )
+ output_df = CamelToSnakeTransformation(column="camelCaseColumn").transform(input_df)
```
__output_df:__
diff --git a/koheesio/steps/transformations/date_time/__init__.py b/koheesio/steps/transformations/date_time/__init__.py
index 5e57f99c..1026d175 100644
--- a/koheesio/steps/transformations/date_time/__init__.py
+++ b/koheesio/steps/transformations/date_time/__init__.py
@@ -2,6 +2,8 @@
from typing import Optional, Union
+from pytz import all_timezones_set
+
from pyspark.sql import Column
from pyspark.sql import functions as f
from pyspark.sql.functions import (
@@ -13,7 +15,6 @@
to_utc_timestamp,
when,
)
-from pytz import all_timezones_set
from koheesio.models import Field, field_validator, model_validator
from koheesio.steps.transformations import ColumnsTransformationWithTarget
diff --git a/koheesio/steps/transformations/date_time/interval.py b/koheesio/steps/transformations/date_time/interval.py
index 84dccfe0..1f872bcb 100644
--- a/koheesio/steps/transformations/date_time/interval.py
+++ b/koheesio/steps/transformations/date_time/interval.py
@@ -99,14 +99,10 @@
DateTimeAddInterval,
)
-input_df = spark.createDataFrame(
- [(1, "2022-01-01 00:00:00")], ["id", "my_column"]
-)
+input_df = spark.createDataFrame([(1, "2022-01-01 00:00:00")], ["id", "my_column"])
# add 1 day to my_column and store the result in a new column called 'one_day_later'
-output_df = DateTimeAddInterval(
- column="my_column", target_column="one_day_later", interval="1 day"
-).transform(input_df)
+output_df = DateTimeAddInterval(column="my_column", target_column="one_day_later", interval="1 day").transform(input_df)
```
__output_df__:
diff --git a/koheesio/steps/transformations/lookup.py b/koheesio/steps/transformations/lookup.py
index 34c11b22..ac8c926b 100644
--- a/koheesio/steps/transformations/lookup.py
+++ b/koheesio/steps/transformations/lookup.py
@@ -9,8 +9,8 @@
DataframeLookup
"""
-from enum import Enum
from typing import List, Optional, Union
+from enum import Enum
import pyspark.sql.functions as f
from pyspark.sql import Column, DataFrame
@@ -102,9 +102,7 @@ class DataframeLookup(Transformation):
df=left_df,
other=right_df,
on=JoinMapping(source_column="id", joined_column="id"),
- targets=TargetColumn(
- target_column="value", target_column_alias="right_value"
- ),
+ targets=TargetColumn(target_column="value", target_column_alias="right_value"),
how=JoinType.LEFT,
)
diff --git a/koheesio/steps/transformations/strings/change_case.py b/koheesio/steps/transformations/strings/change_case.py
index 26c1b743..62bbb003 100644
--- a/koheesio/steps/transformations/strings/change_case.py
+++ b/koheesio/steps/transformations/strings/change_case.py
@@ -51,9 +51,7 @@ class LowerCase(ColumnsTransformationWithTarget):
| Beans| 1600| USA|
```python
- output_df = LowerCase(
- column="product", target_column="product_lower"
- ).transform(df)
+ output_df = LowerCase(column="product", target_column="product_lower").transform(df)
```
__output_df:__
@@ -109,9 +107,7 @@ class UpperCase(LowerCase):
| Beans| 1600| USA|
```python
- output_df = UpperCase(
- column="product", target_column="product_upper"
- ).transform(df)
+ output_df = UpperCase(column="product", target_column="product_upper").transform(df)
```
__output_df:__
@@ -162,9 +158,7 @@ class TitleCase(LowerCase):
| Beans| 1600| USA|
```python
- output_df = TitleCase(
- column="product", target_column="product_title"
- ).transform(df)
+ output_df = TitleCase(column="product", target_column="product_title").transform(df)
```
__output_df:__
diff --git a/koheesio/steps/transformations/strings/split.py b/koheesio/steps/transformations/strings/split.py
index a6d80ab6..68b3e96a 100644
--- a/koheesio/steps/transformations/strings/split.py
+++ b/koheesio/steps/transformations/strings/split.py
@@ -51,9 +51,7 @@ class SplitAll(ColumnsTransformationWithTarget):
| Beans| 1600| USA|
```python
- output_df = SplitColumn(
- column="product", target_column="split", split_pattern=" "
- ).transform(input_df)
+ output_df = SplitColumn(column="product", target_column="split", split_pattern=" ").transform(input_df)
```
__output_df:__
@@ -109,9 +107,7 @@ class SplitAtFirstMatch(SplitAll):
| Beans| 1600| USA|
```python
- output_df = SplitColumn(
- column="product", target_column="split_first", split_pattern="an"
- ).transform(input_df)
+ output_df = SplitColumn(column="product", target_column="split_first", split_pattern="an").transform(input_df)
```
__output_df:__
diff --git a/koheesio/steps/transformations/strings/trim.py b/koheesio/steps/transformations/strings/trim.py
index d77ebd0d..db0e3117 100644
--- a/koheesio/steps/transformations/strings/trim.py
+++ b/koheesio/steps/transformations/strings/trim.py
@@ -57,9 +57,7 @@ class Trim(ColumnsTransformationWithTarget):
### Trim whitespace from the beginning of a string
```python
- output_df = Trim(
- column="column", target_column="trimmed_column", direction="left"
- ).transform(input_df)
+ output_df = Trim(column="column", target_column="trimmed_column", direction="left").transform(input_df)
```
__output_df:__
@@ -86,9 +84,7 @@ class Trim(ColumnsTransformationWithTarget):
### Trim whitespace from the end of a string
```python
- output_df = Trim(
- column="column", target_column="trimmed_column", direction="right"
- ).transform(input_df)
+ output_df = Trim(column="column", target_column="trimmed_column", direction="right").transform(input_df)
```
__output_df:__
diff --git a/koheesio/steps/transformations/transform.py b/koheesio/steps/transformations/transform.py
index f97ab70e..07771c7c 100644
--- a/koheesio/steps/transformations/transform.py
+++ b/koheesio/steps/transformations/transform.py
@@ -6,8 +6,8 @@
from __future__ import annotations
-from functools import partial
from typing import Callable, Dict
+from functools import partial
from pyspark.sql import DataFrame
diff --git a/koheesio/steps/transformations/uuid5.py b/koheesio/steps/transformations/uuid5.py
index 79e62e90..3e2df23a 100644
--- a/koheesio/steps/transformations/uuid5.py
+++ b/koheesio/steps/transformations/uuid5.py
@@ -110,9 +110,7 @@ class HashUUID5(Transformation):
In code:
```python
- HashUUID5(source_columns=["id", "string"], target_column="uuid5").transform(
- input_df
- )
+ HashUUID5(source_columns=["id", "string"], target_column="uuid5").transform(input_df)
```
In this example, the `id` and `string` columns are concatenated and hashed using the UUID5 algorithm. The result is
diff --git a/koheesio/steps/writers/__init__.py b/koheesio/steps/writers/__init__.py
index bb29e537..87e2a30a 100644
--- a/koheesio/steps/writers/__init__.py
+++ b/koheesio/steps/writers/__init__.py
@@ -1,8 +1,8 @@
"""The Writer class is used to write the DataFrame to a target."""
+from typing import Optional
from abc import ABC, abstractmethod
from enum import Enum
-from typing import Optional
from pyspark.sql import DataFrame
diff --git a/koheesio/steps/writers/buffer.py b/koheesio/steps/writers/buffer.py
index 065ced94..e9128719 100644
--- a/koheesio/steps/writers/buffer.py
+++ b/koheesio/steps/writers/buffer.py
@@ -14,14 +14,16 @@
"""
import gzip
+from typing import Literal, Optional
from abc import ABC
from functools import partial
from os import linesep
from tempfile import SpooledTemporaryFile
-from typing import Literal, Optional
from pandas._typing import CompressionOptions as PandasCompressionOptions
+
from pydantic import InstanceOf
+
from pyspark import pandas
from koheesio.models import ExtraParamsMixin, Field, constr
@@ -129,7 +131,7 @@ class PandasCsvBufferWriter(BufferWriter, ExtraParamsMixin):
|------------------|-----------------|-----------------|----------------|-------------------|------------------|-------|
| maxRecordsPerFile| ... | chunksize | None | max_records_per_file | ... | Spark property name: spark.sql.files.maxRecordsPerFile |
| sep | , | sep | , | sep | , | |
- | lineSep | `\\n ` | line_terminator | os.linesep | lineSep (alias=line_terminator) | \\n | |
+ | lineSep | `\\n ` | line_terminator | os.linesep | lineSep (alias=line_terminator) | \\n | |
| N/A | ... | index | True | index | False | Determines whether row labels (index) are included in the output |
| header | False | header | True | header | True | |
| quote | " | quotechar | " | quote (alias=quotechar) | " | |
@@ -258,8 +260,19 @@ class Output(BufferWriter.Output):
pandas_df: Optional[pandas.DataFrame] = Field(None, description="The Pandas DataFrame that was written")
- def get_options(self):
+ def get_options(self, options_type: str = "csv"):
"""Returns the options to pass to Pandas' to_csv() method."""
+ try:
+ import pandas as _pd
+
+ # Get the pandas version as a tuple of integers
+ pandas_version = tuple(int(i) for i in _pd.__version__.split("."))
+ except ImportError:
+ raise ImportError("Pandas is required to use this writer")
+
+ # Use line_separator for pandas 2.0.0 and later
+ line_sep_option_naming = "line_separator" if pandas_version >= (2, 0, 0) else "line_terminator"
+
csv_options = {
"header": self.header,
"sep": self.sep,
@@ -267,12 +280,18 @@ def get_options(self):
"doublequote": self.quoteAll,
"escapechar": self.escape,
"na_rep": self.emptyValue or self.nullValue,
- "line_terminator": self.lineSep,
+ line_sep_option_naming: self.lineSep,
"index": self.index,
"date_format": self.timestampFormat,
"compression": self.compression,
**self.params,
}
+
+ if options_type == "spark":
+ csv_options["lineterminator"] = csv_options.pop(line_sep_option_naming)
+ elif options_type == "kohesio_pandas_buffer_writer":
+ csv_options["line_terminator"] = csv_options.pop(line_sep_option_naming)
+
return csv_options
def execute(self):
@@ -284,7 +303,7 @@ def execute(self):
# create csv file in memory
file_buffer = self.output.buffer
- self.output.pandas_df.to_csv(file_buffer, **self.get_options())
+ self.output.pandas_df.to_csv(file_buffer, **self.get_options(options_type="spark"))
# pylint: disable=C0301
diff --git a/koheesio/steps/writers/delta/batch.py b/koheesio/steps/writers/delta/batch.py
index 01cce76a..5a089b26 100644
--- a/koheesio/steps/writers/delta/batch.py
+++ b/koheesio/steps/writers/delta/batch.py
@@ -34,11 +34,12 @@
```
"""
-from functools import partial
from typing import List, Optional, Set, Type, Union
+from functools import partial
from delta.tables import DeltaMergeBuilder, DeltaTable
from py4j.protocol import Py4JError
+
from pyspark.sql import DataFrameWriter
from koheesio.models import ExtraParamsMixin, Field, field_validator
diff --git a/koheesio/steps/writers/delta/scd.py b/koheesio/steps/writers/delta/scd.py
index 3ded7901..2c96a24f 100644
--- a/koheesio/steps/writers/delta/scd.py
+++ b/koheesio/steps/writers/delta/scd.py
@@ -15,11 +15,13 @@
"""
-from logging import Logger
from typing import List, Optional
+from logging import Logger
from delta.tables import DeltaMergeBuilder, DeltaTable
+
from pydantic import InstanceOf
+
from pyspark.sql import Column
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, TimestampType
diff --git a/koheesio/steps/writers/delta/stream.py b/koheesio/steps/writers/delta/stream.py
index f33dc555..d126f050 100644
--- a/koheesio/steps/writers/delta/stream.py
+++ b/koheesio/steps/writers/delta/stream.py
@@ -2,8 +2,8 @@
This module defines the DeltaTableStreamWriter class, which is used to write streaming dataframes to Delta tables.
"""
-from email.policy import default
from typing import Optional
+from email.policy import default
from pydantic import Field
diff --git a/koheesio/steps/writers/sftp.py b/koheesio/steps/writers/sftp.py
index 18773444..5ddcf237 100644
--- a/koheesio/steps/writers/sftp.py
+++ b/koheesio/steps/writers/sftp.py
@@ -14,9 +14,9 @@
import hashlib
import time
+from typing import Optional, Union
from enum import Enum
from pathlib import Path
-from typing import Optional, Union
from paramiko.sftp_client import SFTPClient
from paramiko.transport import Transport
@@ -383,7 +383,7 @@ class SendCsvToSftp(PandasCsvBufferWriter, SFTPWriter):
@model_validator(mode="after")
def set_up_buffer_writer(self) -> "SendCsvToSftp":
"""Set up the buffer writer, passing all CSV related options to it."""
- self.buffer_writer = PandasCsvBufferWriter(**self.get_options())
+ self.buffer_writer = PandasCsvBufferWriter(**self.get_options(options_type="kohesio_pandas_buffer_writer"))
return self
def execute(self):
diff --git a/koheesio/steps/writers/stream.py b/koheesio/steps/writers/stream.py
index db38867a..63e2f724 100644
--- a/koheesio/steps/writers/stream.py
+++ b/koheesio/steps/writers/stream.py
@@ -15,8 +15,8 @@ class to run a writer for each batch
function to be used as batch_function for StreamWriter (sub)classes
"""
-from abc import ABC, abstractmethod
from typing import Callable, Dict, Optional, Union
+from abc import ABC, abstractmethod
from pyspark.sql.streaming import DataStreamWriter, StreamingQuery
diff --git a/koheesio/utils/__init__.py b/koheesio/utils/__init__.py
index 61d2595c..e6f4d8a5 100644
--- a/koheesio/utils/__init__.py
+++ b/koheesio/utils/__init__.py
@@ -5,10 +5,10 @@
import inspect
import os
import uuid
+from typing import Any, Callable, Dict, Optional, Tuple
from functools import partial
from importlib import import_module
from pathlib import Path
-from typing import Any, Callable, Dict, Optional, Tuple
__all__ = [
"get_args_for_func",
diff --git a/pyproject.toml b/pyproject.toml
index 9734a79d..ed0c901e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,6 +11,7 @@ keywords = [
authors = [
# TODO: add other contributors
{ name = "Danny Meijer", email = "danny.meijer@nike.com" },
+ { name = "Mikita Sakalouski", email = "mikita.sakalouski@nike.com" },
]
classifiers = [
"Development Status :: 5 - Production/Stable",
@@ -19,7 +20,6 @@ classifiers = [
"Natural Language :: English",
"Operating System :: OS Independent",
"Programming Language :: Python",
- "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
@@ -46,12 +46,30 @@ async_http = [
"nest-asyncio>=1.6.0",
]
box = ["boxsdk[jwt]==3.8.1"]
-pandas = ["pandas<2.0.0", "setuptools"]
+# SFTP dependencies in to_csv line_iterator
+pandas = ["pandas>=1.3", "setuptools"]
pyspark = ["pyspark>=3.2.0", "pyarrow>13"]
-se = ["spark-expectations>=1.1.0"]
+#FIXME: loose versioning in spark_excpectations for pluggy module
+# se = ["spark-expectations>=1.1.0"]
+se = []
sftp = ["paramiko>=2.6.0"]
delta = ["delta-spark>=2.2"]
dev = ["black", "isort", "ruff", "mypy", "pylint", "colorama", "types-PyYAML"]
+test = [
+ "chispa",
+ "coverage[toml]",
+ "pytest",
+ "pytest-asyncio",
+ "pytest-cov",
+ "pytest-mock",
+ "pytest-order",
+ "pytest-sftpserver",
+ "pytest-xdist",
+ "pytest-randomly",
+ "requests_mock",
+ "time_machine",
+]
+
[project.urls]
Documentation = "https://github.com/Nike-Inc/koheesio#readme"
@@ -68,114 +86,6 @@ Source = "https://github.com/Nike-Inc/koheesio"
requires = ["hatchling"]
build-backend = "hatchling.build"
-[tool.hatch.version]
-description = """
-The version of the package is dynamically set and is maintained in the top-level `__init__.py` file of the package.
----
-Bump by running `hatch version` with one of the following release types:
-- `major` - breaking changes, i.e. 2.0.0
-- `minor` - new features, i.e. 1.1.0
-- `micro` `fix` or `patch` - bug fixes, i.e. 1.0.1
-- `a` or `alpha` - alpha release, i.e. 1.0.0a1
-- `b` or `beta` - beta release, i.e. 1.0.0b1
-- `c` `rc` `pre` or `preview` - release candidate, i.e. 1.0.0rc1
-- `r` `rev` or `post` - post release, i.e. 1.0.0.post1
-- `dev` - development release, i.e. 1.0.0.dev1
-"""
-path = "koheesio/__about__.py"
-
-[tool.hatch.envs.hatch-static-analysis.scripts]
-#TODO: move scripts from linting and style here
-format-check = "black --check --diff ."
-format-fix = "black ."
-lint-check = "ruff check ."
-lint-fix = "echo 'No formatting is required for this project.'"
-
-[tool.hatch.envs.default]
-description = """
-The default environment is used for development and general use.
----
-We use the `uv` installer by default. This is a superfast, Rust-based installer.
-Run `hatch run` to run scripts in the default environment.
-
-# Code Quality
-To check and format the codebase, we use:
- - `black` for code formatting
- - `isort` for import sorting (includes colorama for colored output)
- - `ruff` for linting.
- - `mypy` for static type checking.
- - `pylint` for code quality checks.
----
-There are several ways to run style checks and formatting:
-- `hatch run black-check` will check the codebase with black without applying fixes.
-- `hatch run black-fmt` will format the codebase using black.
-- `hatch run isort-check` will check the codebase with isort without applying fixes.
-- `hatch run isort-fmt` will format the codebase using isort.
-- `hatch run ruff-check` will check the codebase with ruff without applying fixes.
-- `hatch run ruff-fmt` will format the codebase using ruff.
-- `hatch run mypy-check` will check the codebase with mypy.
-- `hatch run pylint-check` will check the codebase with pylint.
-- `hatch run check` will run all the above checks (including pylint and mypy).
-- `hatch run fmt` or `hatch run fix` will format the codebase using black, isort, and ruff.
-- `hatch run lint` will run ruff, mypy, and pylint.
-
-# Testing and Coverage
-To run the test suite, use:
-- `hatch run all-tests` to run the full test suite.
-- `hatch run spark-tests` to run the Spark test suite.
-- `hatch run log-versions` to log the Python and PySpark versions.
-- `hatch run coverage` or `hatch run cov` to run the test suite with coverage.
-
-Note: the full test suite is will run all tests in all specified Python and PySpark versions. If you want to run tests
-against specific versions, you can add `+py=3.10` or `+version=pyspark34` to the command (replace the versions with the
-desired ones).
-
-For lighter / faster testing, use the `dev` environment with `hatch shell dev` and run the tests with `pytest` or use
-the `all-tests` or `spark-tests` command.
-"""
-installer = "uv"
-features = [
- "async",
- "async_http",
- "pyspark",
- "pandas",
- "sftp",
- "delta",
- "se",
- "box",
- "dev",
-]
-
-[tool.hatch.envs.default.scripts]
-# TODO: add scripts section based on Makefile
-# TODO: add bandit
-# TODO: move scripts from linting and style here
-# Code Quality commands
-black-check = "black --check --diff ."
-black-fmt = "black ."
-isort-check = "isort . --check --diff --color"
-isort-fmt = "isort ."
-ruff-check = "ruff check ."
-ruff-fmt = "ruff check . --fix"
-mypy-check = "mypy koheesio"
-pylint-check = "pylint --output-format=colorized -d W0511 koheesio"
-check = [
- "- black-check",
- "- isort-check",
- "- ruff-check",
- "- mypy-check",
- "- pylint-check",
-]
-fmt = ["black-fmt", "isort-fmt", "ruff-fmt"]
-fix = "fmt"
-lint = ["- ruff-fmt", "- mypy-check", "pylint-check"]
-# Testing and Coverage commands
-log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark"
-all-tests = "pytest test/ {env:HATCH_TEST_ARGS:} {args}"
-spark-tests = "pytest test/spark {env:HATCH_TEST_ARGS:} {args}"
-non-spark-tests = "pytest test/ {env:HATCH_TEST_ARGS:} {args} -k 'not spark'"
-coverage = "pytest test/ {env:HATCH_TEST_ARGS:} {args} --cov=koheesio --cov-report=html --cov-report=term-missing --cov-fail-under=90"
-cov = "coverage"
[tool.black]
line-length = 120
@@ -183,12 +93,11 @@ target-version = ['py39', 'py310', 'py311', 'py312']
include = '\.pyi?$'
extend-exclude = '''
/(
- | __research__
- | __notebooks__
- | test/data
+ | tests/_data
)/
'''
+
[tool.isort]
profile = "black"
skip = [
@@ -215,7 +124,20 @@ skip = [
"node_modules",
"venv",
]
-force_to_top = ["*"]
+force_to_top = ["__future__", "typing"]
+sections = [
+ "FUTURE",
+ "STDLIB",
+ "THIRDPARTY",
+ "PYDANTIC",
+ "PYSPARK",
+ "KOHEESIO",
+ "FIRSTPARTY",
+ "LOCALFOLDER",
+]
+known_pydantic = ["pydantic"]
+known_pyspark = ["pyspark"]
+known_koheesio = ["koheesio"]
[tool.ruff]
# https://docs.astral.sh/ruff/configuration/#using-pyprojecttoml
@@ -316,10 +238,6 @@ unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
-[tool.ruff.lint.isort.sections]
-"koheesio" = ["koheesio"]
-"pyspark" = ["pyspark"]
-
[tool.mypy]
check_untyped_defs = false
disallow_untyped_calls = false
@@ -413,17 +331,71 @@ notes = ["FIXME", "TODO"]
[tool.pylint.refactoring]
max-nested-blocks = 3
+[tool.pytest.ini_options]
+markers = [
+ "default: added to all tests by default if no other marker expect of standard pytest markers is present",
+ "spark: mark a test as a Spark test",
+ # "sftp: mark a test as an SFTP test",
+ # "se: mark a test as a Spark Expectations test",
+ # "box: mark a test as a Box test",
+ # "asyncio: mark a test as an asyncio test",
+ # "asyncio_http: mark a test as an asyncio HTTP test",
+]
+addopts = "-q --color=yes --order-scope=module"
+log_level = "CRITICAL"
+filterwarnings = [
+ # pyspark.pandas warnings
+ "ignore:distutils.*:DeprecationWarning:pyspark.pandas.*",
+ "ignore:'PYARROW_IGNORE_TIMEZONE'.*:UserWarning:pyspark.pandas.*",
+ "ignore:distutils.*:DeprecationWarning:pyspark.sql.pandas.*",
+ "ignore:is_datetime64tz_dtype.*:DeprecationWarning:pyspark.sql.pandas.*",
+ # Koheesio warnings
+ "ignore:DayTimeIntervalType .*:UserWarning:koheesio.steps.integrations.snowflake.*",
+ "ignore:RankDedup is deprecated. Use RowNumberDedup instead.:UserWarning:koheesio.steps.transformations.*",
+]
+
+[tool.coverage.run]
+branch = true
+source = ["koheesio"]
+
+[tool.coverage.report]
+exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"]
+omit = ["tests/*"]
+
### ~~~~~~~~~~~~~~~~~~~~~ ###
# Unit Testing and Coverage #
### ~~~~~~~~~~~~~~~~~~~~~ ###
#
-#TODO: see if we can make this through hatch-test env (using hatch test command)
-[tool.hatch.envs.test]
+
+[tool.hatch.version]
+description = """
+The version of the package is dynamically set and is maintained in the top-level `__init__.py` file of the package.
+---
+Bump by running `hatch version` with one of the following release types:
+- `major` - breaking changes, i.e. 2.0.0
+- `minor` - new features, i.e. 1.1.0
+- `micro` `fix` or `patch` - bug fixes, i.e. 1.0.1
+- `a` or `alpha` - alpha release, i.e. 1.0.0a1
+- `b` or `beta` - beta release, i.e. 1.0.0b1
+- `c` `rc` `pre` or `preview` - release candidate, i.e. 1.0.0rc1
+- `r` `rev` or `post` - post release, i.e. 1.0.0.post1
+- `dev` - development release, i.e. 1.0.0.dev1
+"""
+path = "koheesio/__about__.py"
+
+[tool.hatch.envs.hatch-static-analysis.scripts]
+#TODO: move scripts from linting and style here
+format-check = "black --check --diff ."
+format-fix = "black ."
+lint-check = "ruff check ."
+lint-fix = "echo 'No formatting is required for this project.'"
+
+[tool.hatch.envs.hatch-test]
description = """
The test environment is used to run the test suite.
---
-- Run `hatch run test:all-tests` to run the full test suite.
+- Run `hatch tests` to run the full test suite.
- Run `hatch run test:spark-tests` to run the Spark test suite.
- Run `hatch run test:log-versions` to log the Python and PySpark versions.
- You can test against specific Python and PySpark versions by adding `+py=3.10` or `+version=pyspark34` to the command
@@ -432,19 +404,6 @@ The test environment is used to run the test suite.
Note: Test env will run against all specified python and pyspark versions. Use the `dev` environment for development
and general use.
"""
-dependencies = [
- "chispa",
- "coverage[toml]",
- "pytest",
- "pytest-asyncio",
- "pytest-cov",
- "pytest-mock",
- "pytest-order",
- "pytest-sftpserver",
- "pytest-xdist",
- "requests_mock",
- "time_machine",
-]
features = [
"async",
"async_http",
@@ -455,30 +414,33 @@ features = [
"sftp",
"delta",
"dev",
+ "test",
]
-matrix = [
- { python = [
- "3.9",
- ], version = [
- "pyspark33",
- "pyspark34",
- ] },
- { python = [
- "3.10",
- ], version = [
- "pyspark33",
- "pyspark34",
- "pyspark35",
- ] },
- { python = [
- "3.11",
- "3.12",
- ], version = [
- "pyspark35",
- ] },
-]
-[tool.hatch.envs.test.overrides]
+parallel = true
+retries = 2
+retry-delay = 1
+
+[tool.hatch.envs.hatch-test.scripts]
+run = "pytest{env:HATCH_TEST_ARGS:} {args} -n auto"
+run-cov = "coverage run -m pytest{env:HATCH_TEST_ARGS:} {args}"
+cov-combine = "coverage combine"
+cov-report = "coverage report"
+
+
+[[tool.hatch.envs.hatch-test.matrix]]
+python = ["3.9"]
+version = ["pyspark33", "pyspark34"]
+
+[[tool.hatch.envs.hatch-test.matrix]]
+python = ["3.10"]
+version = ["pyspark33", "pyspark34", "pyspark35"]
+
+[[tool.hatch.envs.hatch-test.matrix]]
+python = ["3.11", "3.12"]
+version = ["pyspark35"]
+
+[tool.hatch.envs.hatch-test.overrides]
matrix.version.extra-dependencies = [
{ value = "pyspark>=3.3,<3.4", if = [
"pyspark33",
@@ -491,24 +453,114 @@ matrix.version.extra-dependencies = [
] },
]
-[tool.pytest.ini_options]
-addopts = "-q --color=yes --order-scope=module"
-log_level = "CRITICAL"
-testpaths = ["test"]
-
-#FIXME: Is it duplication of hatch coverage ?
-[tool.coverage.run]
-branch = true
-source = ["koheesio"]
-
-[tool.coverage.report]
-exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"]
-omit = ["test/*"]
+name.".*".env-vars = [
+ { key = "PYTEST_XDIST_AUTO_NUM_WORKERS", value = "2" },
+ { key = "KOHEESIO__PRINT_LOGO", value = "False" },
+]
### ~~~~ ###
# Docsite #
### ~~~~ ###
#
+[tool.hatch.envs.default]
+description = """
+The default environment is used for development and general use.
+---
+We use the `uv` installer by default. This is a superfast, Rust-based installer.
+Run `hatch run` to run scripts in the default environment.
+
+# Code Quality
+To check and format the codebase, we use:
+ - `black` for code formatting
+ - `isort` for import sorting (includes colorama for colored output)
+ - `ruff` for linting.
+ - `mypy` for static type checking.
+ - `pylint` for code quality checks.
+---
+There are several ways to run style checks and formatting:
+- `hatch run black-check` will check the codebase with black without applying fixes.
+- `hatch run black-fmt` will format the codebase using black.
+- `hatch run isort-check` will check the codebase with isort without applying fixes.
+- `hatch run isort-fmt` will format the codebase using isort.
+- `hatch run ruff-check` will check the codebase with ruff without applying fixes.
+- `hatch run ruff-fmt` will format the codebase using ruff.
+- `hatch run mypy-check` will check the codebase with mypy.
+- `hatch run pylint-check` will check the codebase with pylint.
+- `hatch run check` will run all the above checks (including pylint and mypy).
+- `hatch run fmt` or `hatch run fix` will format the codebase using black, isort, and ruff.
+- `hatch run lint` will run ruff, mypy, and pylint.
+
+# Testing and Coverage
+To run the test suite, use:
+- `hatch run all-tests` to run the full test suite.
+- `hatch run spark-tests` to run the Spark test suite.
+- `hatch run log-versions` to log the Python and PySpark versions.
+- `hatch run coverage` or `hatch run cov` to run the test suite with coverage.
+
+Note: the full test suite is will run all tests in all specified Python and PySpark versions. If you want to run tests
+against specific versions, you can add `+py=3.10` or `+version=pyspark34` to the command (replace the versions with the
+desired ones).
+
+For lighter / faster testing, use the `dev` environment with `hatch shell dev` and run the tests with `pytest` or use
+the `all-tests` or `spark-tests` command.
+"""
+installer = "uv"
+features = [
+ "async",
+ "async_http",
+ "pandas",
+ "pyspark",
+ "sftp",
+ "delta",
+ "se",
+ "box",
+ "dev",
+]
+
+
+[tool.hatch.envs.default.scripts]
+# TODO: add scripts section based on Makefile
+# TODO: add bandit
+# TODO: move scripts from linting and style here
+# Code Quality commands
+black-check = "black --check --diff ."
+black-fmt = "black ."
+isort-check = "isort . --check --diff --color"
+isort-fmt = "isort ."
+ruff-check = "ruff check ."
+ruff-fmt = "ruff check . --fix"
+mypy-check = "mypy koheesio"
+pylint-check = "pylint --output-format=colorized -d W0511 koheesio"
+check = [
+ "- black-check",
+ "- isort-check",
+ "- ruff-check",
+ "- mypy-check",
+ "- pylint-check",
+]
+fmt = ["black-fmt", "isort-fmt", "ruff-fmt"]
+fix = "fmt"
+lint = ["- ruff-fmt", "- mypy-check", "pylint-check"]
+log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark"
+test = "- pytest{env:HATCH_TEST_ARGS:} {args} -n 2"
+spark-tests = "test -m spark"
+non-spark-tests = "test -m \"not spark\""
+
+# scripts.run = "echo bla {env:HATCH_TEST_ARGS:} {args}"
+# scripts.run = "- log-versions && pytest tests/ {env:HATCH_TEST_ARGS:} {args}"
+# run ="echo {args}"
+# run = "- pytest tests/ {env:HATCH_TEST_ARGS:} {args}"
+# run-cov = "coverage run -m pytest{env:HATCH_TEST_ARGS:} {args}"
+# cov-combine = "coverage combine"
+# cov-report = "coverage report"
+# log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark"
+#
+#
+#
+# coverage = "- pytest tests/ {env:HATCH_TEST_ARGS:} {args} --cov=koheesio --cov-report=html --cov-report=term-missing --cov-fail-under=90"
+# cov = "coverage"
+
+
[tool.hatch.envs.docs]
description = """
The docs environment is used to build the documentation.
@@ -572,6 +624,18 @@ Available scripts:
- `coverage` or `cov` - run the test suite with coverage.
"""
path = ".venv"
-template = "test"
python = "3.10"
+template = "default"
+features = [
+ "async",
+ "async_http",
+ "box",
+ "pandas",
+ "pyspark",
+ "se",
+ "sftp",
+ "delta",
+ "dev",
+ "test",
+]
extra-dependencies = ["pyspark==3.4.*"]
diff --git a/test/_data/context/common.yml b/tests/_data/context/common.yml
similarity index 100%
rename from test/_data/context/common.yml
rename to tests/_data/context/common.yml
diff --git a/test/_data/context/dev.yml b/tests/_data/context/dev.yml
similarity index 100%
rename from test/_data/context/dev.yml
rename to tests/_data/context/dev.yml
diff --git a/test/_data/context/sample.json b/tests/_data/context/sample.json
similarity index 100%
rename from test/_data/context/sample.json
rename to tests/_data/context/sample.json
diff --git a/test/_data/context/sample.toml b/tests/_data/context/sample.toml
similarity index 100%
rename from test/_data/context/sample.toml
rename to tests/_data/context/sample.toml
diff --git a/test/_data/context/sample.yaml b/tests/_data/context/sample.yaml
similarity index 100%
rename from test/_data/context/sample.yaml
rename to tests/_data/context/sample.yaml
diff --git a/test/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc b/tests/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc
similarity index 100%
rename from test/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc
rename to tests/_data/readers/avro_file/.part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro.crc
diff --git a/test/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro b/tests/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro
similarity index 100%
rename from test/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro
rename to tests/_data/readers/avro_file/part-00000-c95e3fda-3943-4701-b0ae-cb6bc7e015e4-c000.avro
diff --git a/test/_data/readers/csv_file/dummy.csv b/tests/_data/readers/csv_file/dummy.csv
similarity index 100%
rename from test/_data/readers/csv_file/dummy.csv
rename to tests/_data/readers/csv_file/dummy.csv
diff --git a/test/_data/readers/csv_file/dummy_semicolon.csv b/tests/_data/readers/csv_file/dummy_semicolon.csv
similarity index 100%
rename from test/_data/readers/csv_file/dummy_semicolon.csv
rename to tests/_data/readers/csv_file/dummy_semicolon.csv
diff --git a/test/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc b/tests/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc
similarity index 100%
rename from test/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc
rename to tests/_data/readers/delta_file/.part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet.crc
diff --git a/test/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc b/tests/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc
similarity index 100%
rename from test/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc
rename to tests/_data/readers/delta_file/_delta_log/.00000000000000000000.json.crc
diff --git a/test/_data/readers/delta_file/_delta_log/00000000000000000000.json b/tests/_data/readers/delta_file/_delta_log/00000000000000000000.json
similarity index 100%
rename from test/_data/readers/delta_file/_delta_log/00000000000000000000.json
rename to tests/_data/readers/delta_file/_delta_log/00000000000000000000.json
diff --git a/test/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet b/tests/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet
similarity index 100%
rename from test/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet
rename to tests/_data/readers/delta_file/part-00000-0f8b246f-e779-4d48-896a-6a12362bf370-c000.snappy.parquet
diff --git a/test/_data/readers/json_file/dummy.json b/tests/_data/readers/json_file/dummy.json
similarity index 100%
rename from test/_data/readers/json_file/dummy.json
rename to tests/_data/readers/json_file/dummy.json
diff --git a/test/_data/readers/json_file/dummy_simple.json b/tests/_data/readers/json_file/dummy_simple.json
similarity index 100%
rename from test/_data/readers/json_file/dummy_simple.json
rename to tests/_data/readers/json_file/dummy_simple.json
diff --git a/test/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc b/tests/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc
similarity index 100%
rename from test/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc
rename to tests/_data/readers/orc_file/.part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc.crc
diff --git a/test/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc b/tests/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc
similarity index 100%
rename from test/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc
rename to tests/_data/readers/orc_file/part-00000-ddcce057-2ef4-44b1-baf4-d263f748b060-c000.snappy.orc
diff --git a/test/_data/sql/spark_sql_reader.sql b/tests/_data/sql/spark_sql_reader.sql
similarity index 100%
rename from test/_data/sql/spark_sql_reader.sql
rename to tests/_data/sql/spark_sql_reader.sql
diff --git a/test/_data/steps/expected_step_output.yaml b/tests/_data/steps/expected_step_output.yaml
similarity index 100%
rename from test/_data/steps/expected_step_output.yaml
rename to tests/_data/steps/expected_step_output.yaml
diff --git a/test/_data/steps/expected_step_output_simple.yaml b/tests/_data/steps/expected_step_output_simple.yaml
similarity index 100%
rename from test/_data/steps/expected_step_output_simple.yaml
rename to tests/_data/steps/expected_step_output_simple.yaml
diff --git a/test/_data/transformations/dummy.sql b/tests/_data/transformations/dummy.sql
similarity index 100%
rename from test/_data/transformations/dummy.sql
rename to tests/_data/transformations/dummy.sql
diff --git a/test/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl b/tests/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl
similarity index 100%
rename from test/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl
rename to tests/_data/transformations/spark_expectations_resources/test_product_dq_stats.ddl
diff --git a/test/_data/transformations/spark_expectations_resources/test_product_rules.ddl b/tests/_data/transformations/spark_expectations_resources/test_product_rules.ddl
similarity index 100%
rename from test/_data/transformations/spark_expectations_resources/test_product_rules.ddl
rename to tests/_data/transformations/spark_expectations_resources/test_product_rules.ddl
diff --git a/test/_data/transformations/spark_expectations_resources/test_product_rules.sql b/tests/_data/transformations/spark_expectations_resources/test_product_rules.sql
similarity index 100%
rename from test/_data/transformations/spark_expectations_resources/test_product_rules.sql
rename to tests/_data/transformations/spark_expectations_resources/test_product_rules.sql
diff --git a/test/_data/transformations/string_data/100000_rows_with_strings.json b/tests/_data/transformations/string_data/100000_rows_with_strings.json
similarity index 100%
rename from test/_data/transformations/string_data/100000_rows_with_strings.json
rename to tests/_data/transformations/string_data/100000_rows_with_strings.json
diff --git a/test/conftest.py b/tests/conftest.py
similarity index 100%
rename from test/conftest.py
rename to tests/conftest.py
diff --git a/test/core/test_context.py b/tests/core/test_context.py
similarity index 98%
rename from test/core/test_context.py
rename to tests/core/test_context.py
index b05c89af..920f7382 100644
--- a/test/core/test_context.py
+++ b/tests/core/test_context.py
@@ -1,6 +1,7 @@
from textwrap import dedent
import pytest
+
from pydantic import SecretStr
from koheesio.context import Context
@@ -10,15 +11,15 @@
test_context = Context(test_dict)
PROJECT_ROOT = get_project_root()
-CONTEXT_FOLDER = PROJECT_ROOT / "test" / "_data" / "context"
+CONTEXT_FOLDER = PROJECT_ROOT / "tests" / "_data" / "context"
SAMPLE_YAML = CONTEXT_FOLDER / "sample.yaml"
SAMPLE_JSON = CONTEXT_FOLDER / "sample.json"
def test_add():
context = Context(test_dict)
- context.add("unit", "test")
- assert context.unit == "test"
+ context.add("unit", "tests")
+ assert context.unit == "tests"
def test_get():
diff --git a/test/core/test_init.py b/tests/core/test_init.py
similarity index 100%
rename from test/core/test_init.py
rename to tests/core/test_init.py
diff --git a/test/core/test_logger.py b/tests/core/test_logger.py
similarity index 99%
rename from test/core/test_logger.py
rename to tests/core/test_logger.py
index 34ee4025..277d7664 100644
--- a/test/core/test_logger.py
+++ b/tests/core/test_logger.py
@@ -90,7 +90,7 @@ def test_log_masked(foo, bar, baz, params):
log_capture_string = StringIO()
ch = logging.StreamHandler(log_capture_string)
ch.setLevel(logging.DEBUG)
- logger = logging.getLogger("test")
+ logger = logging.getLogger("tests")
logger.setLevel(logging.DEBUG)
logger.addHandler(ch)
diff --git a/test/core/test_steps.py b/tests/core/test_steps.py
similarity index 99%
rename from test/core/test_steps.py
rename to tests/core/test_steps.py
index 41ab9ac3..2d046083 100644
--- a/test/core/test_steps.py
+++ b/tests/core/test_steps.py
@@ -7,7 +7,9 @@
from unittest.mock import call, patch
import pytest
+
from pydantic import ValidationError
+
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
@@ -197,7 +199,7 @@ def test_step_to_yaml_description_and_name(self):
# execute the step
test_step.execute()
- expected_output_path = PROJECT_ROOT / "test" / "_data" / "steps"
+ expected_output_path = PROJECT_ROOT / "tests" / "_data" / "steps"
# using .strip() to avoid the test failing on leading or trailing whitespaces
expected = (expected_output_path / "expected_step_output.yaml").read_text().strip()
diff --git a/test/deprecated/nike/ada/__init__.py b/tests/deprecated/nike/ada/__init__.py
similarity index 100%
rename from test/deprecated/nike/ada/__init__.py
rename to tests/deprecated/nike/ada/__init__.py
diff --git a/test/models/test_models.py b/tests/models/test_models.py
similarity index 99%
rename from test/models/test_models.py
rename to tests/models/test_models.py
index ca695db8..3d981f95 100644
--- a/test/models/test_models.py
+++ b/tests/models/test_models.py
@@ -1,6 +1,6 @@
import json
-from textwrap import dedent
from typing import Optional
+from textwrap import dedent
import pytest
import yaml
@@ -10,7 +10,6 @@
class TestBaseModel:
-
class SimpleModel(BaseModel):
a: int
b: str = "default"
diff --git a/test/spark/conftest.py b/tests/spark/conftest.py
similarity index 97%
rename from test/spark/conftest.py
rename to tests/spark/conftest.py
index 71406bd1..7066ee92 100644
--- a/test/spark/conftest.py
+++ b/tests/spark/conftest.py
@@ -8,6 +8,7 @@
import pytest
from delta import configure_spark_with_delta_pip
+
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import (
ArrayType,
@@ -34,7 +35,7 @@
from koheesio.utils import get_project_root
PROJECT_ROOT = get_project_root()
-TEST_DATA_PATH = Path(PROJECT_ROOT / "test" / "_data")
+TEST_DATA_PATH = Path(PROJECT_ROOT / "tests" / "_data")
DELTA_FILE = Path(TEST_DATA_PATH / "readers" / "delta_file")
@@ -67,6 +68,8 @@ def spark(warehouse_path, random_uuid):
.config("spark.sql.warehouse.dir", warehouse_path)
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.session.timeZone", "UTC")
+ .config("spark.sql.execution.arrow.pyspark.enabled", "true")
+ .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")
)
spark_session = configure_spark_with_delta_pip(builder).getOrCreate()
diff --git a/test/spark/integrations/box/test_box.py b/tests/spark/integrations/box/test_box.py
similarity index 99%
rename from test/spark/integrations/box/test_box.py
rename to tests/spark/integrations/box/test_box.py
index cf851562..5ddf59c5 100644
--- a/test/spark/integrations/box/test_box.py
+++ b/tests/spark/integrations/box/test_box.py
@@ -3,6 +3,7 @@
from pathlib import PurePath
import pytest
+
from pydantic import ValidationError
from koheesio.steps.integrations.box import (
@@ -24,6 +25,8 @@
StructType,
)
+pytestmark = pytest.mark.spark
+
COMMON_PARAMS = {
"client_id": SecretStr("client_id"),
"client_secret": SecretStr("client_secret"),
diff --git a/test/spark/integrations/dq/test_spark_expectations.py b/tests/spark/integrations/dq/test_spark_expectations.py
similarity index 89%
rename from test/spark/integrations/dq/test_spark_expectations.py
rename to tests/spark/integrations/dq/test_spark_expectations.py
index 983ff0c4..a4fff146 100644
--- a/test/spark/integrations/dq/test_spark_expectations.py
+++ b/tests/spark/integrations/dq/test_spark_expectations.py
@@ -1,15 +1,16 @@
from typing import List, Union
import pytest
+
from pyspark.sql import SparkSession
-from koheesio.steps.integrations.dq.spark_expectations import (
- SparkExpectationsTransformation,
-)
from koheesio.utils import get_project_root
PROJECT_ROOT = get_project_root()
+pytestmark = pytest.mark.spark
+pytestmark = pytest.mark.skip(reason="Skipping all tests in this module due to the spark expectation package issues")
+
class TestSparkExpectationsTransform:
"""
@@ -45,6 +46,10 @@ def prepare_tables(self, spark, data_path):
)
def test_rows_are_dropped(self, spark: SparkSession, prepare_tables):
+ from koheesio.steps.integrations.dq.spark_expectations import (
+ SparkExpectationsTransformation,
+ )
+
input_df = spark.createDataFrame(
TestSparkExpectationsTransform.input_data,
schema=TestSparkExpectationsTransform.input_schema,
@@ -66,6 +71,10 @@ def test_rows_are_dropped(self, spark: SparkSession, prepare_tables):
assert err_table_df.count() == 2
def test_meta_columns_are_not_dropped(self, spark, prepare_tables):
+ from koheesio.steps.integrations.dq.spark_expectations import (
+ SparkExpectationsTransformation,
+ )
+
input_df = spark.createDataFrame(
TestSparkExpectationsTransform.input_data,
schema=TestSparkExpectationsTransform.input_schema,
@@ -87,6 +96,10 @@ def test_meta_columns_are_not_dropped(self, spark, prepare_tables):
assert "meta_dq_run_date" in output_columns or "meta_dq_run_datetime" in output_columns
def test_meta_columns_are_dropped(self, spark, prepare_tables):
+ from koheesio.steps.integrations.dq.spark_expectations import (
+ SparkExpectationsTransformation,
+ )
+
input_df = spark.createDataFrame(
TestSparkExpectationsTransform.input_data,
schema=TestSparkExpectationsTransform.input_schema,
@@ -117,6 +130,10 @@ def apply_spark_sql(spark: SparkSession, source_sql_files: Union[List[str], str]
spark.sql(content)
def test_with_full_se_user_conf(self):
+ from koheesio.steps.integrations.dq.spark_expectations import (
+ SparkExpectationsTransformation,
+ )
+
conf = {
"spark.expectations.notifications.email.enabled": False,
"spark.expectations.notifications.email.smtp_host": "mailhost.email.com",
@@ -143,6 +160,10 @@ def test_with_full_se_user_conf(self):
assert instance.se_user_conf == conf
def test_overwrite_error_writer(self):
+ from koheesio.steps.integrations.dq.spark_expectations import (
+ SparkExpectationsTransformation,
+ )
+
"""
Test that the error_writer can be overwritten using error_writer_mode and error_writer_format
"""
@@ -168,6 +189,10 @@ def test_overwrite_error_writer(self):
assert error_writer._options == {"mergeSchema": "true"}
def test_overwrite_stats_writer(self):
+ from koheesio.steps.integrations.dq.spark_expectations import (
+ SparkExpectationsTransformation,
+ )
+
"""
Test that the stats_writer can be overwritten using stats_writer_mode and stats_writer_format and that the
error_writer_options default = {}.
diff --git a/test/spark/integrations/snowflake/test_snowflake.py b/tests/spark/integrations/snowflake/test_snowflake.py
similarity index 99%
rename from test/spark/integrations/snowflake/test_snowflake.py
rename to tests/spark/integrations/snowflake/test_snowflake.py
index 6215494d..a8872c24 100644
--- a/test/spark/integrations/snowflake/test_snowflake.py
+++ b/tests/spark/integrations/snowflake/test_snowflake.py
@@ -3,6 +3,7 @@
from unittest.mock import Mock, patch
import pytest
+
from pyspark.sql import SparkSession
from pyspark.sql import types as t
@@ -26,6 +27,8 @@
)
from koheesio.steps.writers import BatchOutputMode
+pytestmark = pytest.mark.spark
+
COMMON_OPTIONS = {
"url": "url",
"user": "user",
diff --git a/test/spark/integrations/snowflake/test_sync_task.py b/tests/spark/integrations/snowflake/test_sync_task.py
similarity index 99%
rename from test/spark/integrations/snowflake/test_sync_task.py
rename to tests/spark/integrations/snowflake/test_sync_task.py
index 2563134b..93ef50eb 100644
--- a/test/spark/integrations/snowflake/test_sync_task.py
+++ b/tests/spark/integrations/snowflake/test_sync_task.py
@@ -4,9 +4,11 @@
from unittest import mock
import chispa
-import pydantic
import pytest
from conftest import await_job_completion
+
+import pydantic
+
from pyspark.sql import DataFrame
from koheesio.steps.delta import DeltaTableStep
@@ -20,6 +22,8 @@
from koheesio.steps.writers.delta import DeltaTableWriter
from koheesio.steps.writers.stream import ForEachBatchStreamWriter
+pytestmark = pytest.mark.spark
+
COMMON_OPTIONS = {
"source_table": DeltaTableStep(table=""),
"target_table": "foo.bar",
@@ -40,7 +44,7 @@
@pytest.fixture
def snowflake_staging_file():
- filename = "test/data/snowflake_staging.parq"
+ filename = "tests/_data/snowflake_staging.parq"
if os.path.exists(filename):
shutil.rmtree(filename)
diff --git a/test/spark/readers/test_auto_loader.py b/tests/spark/readers/test_auto_loader.py
similarity index 97%
rename from test/spark/readers/test_auto_loader.py
rename to tests/spark/readers/test_auto_loader.py
index e0c7f1d7..b72b36b7 100644
--- a/test/spark/readers/test_auto_loader.py
+++ b/tests/spark/readers/test_auto_loader.py
@@ -1,11 +1,12 @@
-from pathlib import Path
-
import pytest
from chispa import assert_df_equality
+
from pyspark.sql.types import *
from koheesio.steps.readers.databricks.autoloader import AutoLoader
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"bad_format",
diff --git a/test/spark/readers/test_delta_reader.py b/tests/spark/readers/test_delta_reader.py
similarity index 99%
rename from test/spark/readers/test_delta_reader.py
rename to tests/spark/readers/test_delta_reader.py
index 7511ba97..0db2db82 100644
--- a/test/spark/readers/test_delta_reader.py
+++ b/tests/spark/readers/test_delta_reader.py
@@ -1,10 +1,13 @@
import pytest
+
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from koheesio.steps.readers.delta import DeltaTableReader
from koheesio.steps.spark import AnalysisException
+pytestmark = pytest.mark.spark
+
def test_delta_table_reader(spark):
df = DeltaTableReader(table="delta_test_table").read()
diff --git a/test/spark/readers/test_file_loader.py b/tests/spark/readers/test_file_loader.py
similarity index 99%
rename from test/spark/readers/test_file_loader.py
rename to tests/spark/readers/test_file_loader.py
index a24662bb..01f412e3 100644
--- a/test/spark/readers/test_file_loader.py
+++ b/tests/spark/readers/test_file_loader.py
@@ -11,6 +11,8 @@
)
from koheesio.steps.spark import AnalysisException
+pytestmark = pytest.mark.spark
+
@pytest.fixture()
def json_file(data_path):
diff --git a/test/spark/readers/test_hana.py b/tests/spark/readers/test_hana.py
similarity index 94%
rename from test/spark/readers/test_hana.py
rename to tests/spark/readers/test_hana.py
index 33980f6d..7375038a 100644
--- a/test/spark/readers/test_hana.py
+++ b/tests/spark/readers/test_hana.py
@@ -1,9 +1,13 @@
from unittest import mock
+import pytest
+
from pyspark.sql import SparkSession
from koheesio.steps.readers.hana import HanaReader
+pytestmark = pytest.mark.spark
+
class TestHanaReader:
common_options = {
diff --git a/test/spark/readers/test_jdbc.py b/tests/spark/readers/test_jdbc.py
similarity index 98%
rename from test/spark/readers/test_jdbc.py
rename to tests/spark/readers/test_jdbc.py
index 0816c85c..0dd8c83a 100644
--- a/test/spark/readers/test_jdbc.py
+++ b/tests/spark/readers/test_jdbc.py
@@ -1,10 +1,13 @@
from unittest import mock
import pytest
+
from pyspark.sql import SparkSession
from koheesio.steps.readers.jdbc import JdbcReader
+pytestmark = pytest.mark.spark
+
class TestJdbcReader:
common_options = {
diff --git a/test/spark/readers/test_memory.py b/tests/spark/readers/test_memory.py
similarity index 98%
rename from test/spark/readers/test_memory.py
rename to tests/spark/readers/test_memory.py
index 56af0314..795f6f07 100644
--- a/test/spark/readers/test_memory.py
+++ b/tests/spark/readers/test_memory.py
@@ -1,9 +1,12 @@
import pytest
from chispa import assert_df_equality
+
from pyspark.sql.types import StructType
from koheesio.steps.readers.memory import DataFormat, InMemoryDataReader
+pytestmark = pytest.mark.spark
+
class TestInMemoryDataReader:
# fmt: off
diff --git a/test/spark/readers/test_metastore_reader.py b/tests/spark/readers/test_metastore_reader.py
similarity index 87%
rename from test/spark/readers/test_metastore_reader.py
rename to tests/spark/readers/test_metastore_reader.py
index e107f78c..14fdc08d 100644
--- a/test/spark/readers/test_metastore_reader.py
+++ b/tests/spark/readers/test_metastore_reader.py
@@ -1,7 +1,11 @@
+import pytest
+
from pyspark.sql.dataframe import DataFrame
from koheesio.steps.readers.metastore import MetastoreReader
+pytestmark = pytest.mark.spark
+
def test_metastore_reader(spark):
df = MetastoreReader(table="klettern.delta_test_table").read()
diff --git a/test/spark/readers/test_reader.py b/tests/spark/readers/test_reader.py
similarity index 89%
rename from test/spark/readers/test_reader.py
rename to tests/spark/readers/test_reader.py
index cbe09983..e5437d00 100644
--- a/test/spark/readers/test_reader.py
+++ b/tests/spark/readers/test_reader.py
@@ -1,5 +1,9 @@
+import pytest
+
from koheesio.steps.readers.dummy import DummyReader
+pytestmark = pytest.mark.spark
+
def test_reader(spark):
test_reader = DummyReader(range=1)
diff --git a/test/spark/readers/test_rest_api.py b/tests/spark/readers/test_rest_api.py
similarity index 99%
rename from test/spark/readers/test_rest_api.py
rename to tests/spark/readers/test_rest_api.py
index 9dcc6e9b..a7da1567 100644
--- a/test/spark/readers/test_rest_api.py
+++ b/tests/spark/readers/test_rest_api.py
@@ -2,9 +2,10 @@
import requests_mock
from aiohttp import ClientSession, TCPConnector
from aiohttp_retry import ExponentialRetry
-from pyspark.sql.types import MapType, StringType, StructField, StructType
from yarl import URL
+from pyspark.sql.types import MapType, StringType, StructField, StructType
+
from koheesio.steps.asyncio.http import AsyncHttpStep
from koheesio.steps.http import PaginatedHtppGetStep
from koheesio.steps.readers.rest_api import AsyncHttpGetStep, RestApiReader
@@ -12,6 +13,8 @@
ASYNC_BASE_URL = "http://httpbin.org"
ASYNC_GET_ENDPOINT = URL(f"{ASYNC_BASE_URL}/get")
+pytestmark = pytest.mark.spark
+
@pytest.fixture
def mock_paginated_api():
diff --git a/test/spark/readers/test_spark_sql_reader.py b/tests/spark/readers/test_spark_sql_reader.py
similarity index 85%
rename from test/spark/readers/test_spark_sql_reader.py
rename to tests/spark/readers/test_spark_sql_reader.py
index 5d6cdb47..c03f21d6 100644
--- a/test/spark/readers/test_spark_sql_reader.py
+++ b/tests/spark/readers/test_spark_sql_reader.py
@@ -2,6 +2,8 @@
from koheesio.steps.readers.spark_sql_reader import SparkSqlReader
+pytestmark = pytest.mark.spark
+
def test_spark_sql_reader(spark, data_path):
spark.sql("CREATE TABLE IF NOT EXISTS table_A (id INT, a_name STRING) USING DELTA")
@@ -27,10 +29,10 @@ def test_spark_sql_reader(spark, data_path):
def test_spark_sql_reader_failed():
with pytest.raises(ValueError):
- SparkSqlReader(sql="SELECT 1", sql_path="test/resources/sql/none_existent_path.sql")
+ SparkSqlReader(sql="SELECT 1", sql_path="tests/resources/sql/none_existent_path.sql")
with pytest.raises(FileNotFoundError):
- SparkSqlReader(sql_path="test/resources/sql/none_existent_path.sql")
+ SparkSqlReader(sql_path="tests/resources/sql/none_existent_path.sql")
with pytest.raises(ValueError):
SparkSqlReader(sql=None)
diff --git a/test/spark/readers/test_teradata.py b/tests/spark/readers/test_teradata.py
similarity index 94%
rename from test/spark/readers/test_teradata.py
rename to tests/spark/readers/test_teradata.py
index c80b8c3a..244c1fb3 100644
--- a/test/spark/readers/test_teradata.py
+++ b/tests/spark/readers/test_teradata.py
@@ -1,9 +1,13 @@
from unittest import mock
+import pytest
+
from pyspark.sql import SparkSession
from koheesio.steps.readers.teradata import TeradataReader
+pytestmark = pytest.mark.spark
+
class TestTeradataReader:
common_options = {
diff --git a/test/spark/tasks/test_etl_task.py b/tests/spark/tasks/test_etl_task.py
similarity index 98%
rename from test/spark/tasks/test_etl_task.py
rename to tests/spark/tasks/test_etl_task.py
index a09ab40d..26a9932c 100644
--- a/test/spark/tasks/test_etl_task.py
+++ b/tests/spark/tasks/test_etl_task.py
@@ -1,4 +1,6 @@
+import pytest
from conftest import await_job_completion
+
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, lit
@@ -12,6 +14,8 @@
from koheesio.steps.writers.dummy import DummyWriter
from koheesio.tasks.etl_task import EtlTask
+pytestmark = pytest.mark.spark
+
def dummy_function(df: DataFrame):
return df.withColumn("hello", lit("world"))
diff --git a/test/spark/test_delta.py b/tests/spark/test_delta.py
similarity index 99%
rename from test/spark/test_delta.py
rename to tests/spark/test_delta.py
index d0aa8aba..fe79ab87 100644
--- a/test/spark/test_delta.py
+++ b/tests/spark/test_delta.py
@@ -3,12 +3,16 @@
import pytest
from conftest import setup_test_data
+
from pydantic import ValidationError
+
from pyspark.sql.types import LongType
from koheesio.logger import LoggingFactory
from koheesio.steps.delta import DeltaTableStep
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name="test_delta")
diff --git a/test/spark/test_spark.py b/tests/spark/test_spark.py
similarity index 90%
rename from test/spark/test_spark.py
rename to tests/spark/test_spark.py
index 93f50306..078d9144 100644
--- a/test/spark/test_spark.py
+++ b/tests/spark/test_spark.py
@@ -13,6 +13,8 @@
from koheesio.models import SecretStr
+pytestmark = pytest.mark.spark
+
class TestSparkImportFailures:
def test_import_error_no_error(self):
@@ -27,6 +29,6 @@ def test_import_error_with_error(self):
with pytest.raises(ImportError):
from pyspark.sql import SparkSession
- SparkSession.builder.appName("test").getOrCreate()
+ SparkSession.builder.appName("tests").getOrCreate()
pass
diff --git a/test/spark/test_warnings.py b/tests/spark/test_warnings.py
similarity index 96%
rename from test/spark/test_warnings.py
rename to tests/spark/test_warnings.py
index 49168fdc..73c69d1d 100644
--- a/test/spark/test_warnings.py
+++ b/tests/spark/test_warnings.py
@@ -4,6 +4,8 @@
from koheesio.steps.writers import BatchOutputMode
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize("output_mode", [(BatchOutputMode.APPEND)])
def test_muted_warnings(output_mode, dummy_df, spark):
diff --git a/test/spark/transformations/date_time/test_date_time.py b/tests/spark/transformations/date_time/test_date_time.py
similarity index 99%
rename from test/spark/transformations/date_time/test_date_time.py
rename to tests/spark/transformations/date_time/test_date_time.py
index b644012d..cfb6169c 100644
--- a/test/spark/transformations/date_time/test_date_time.py
+++ b/tests/spark/transformations/date_time/test_date_time.py
@@ -9,6 +9,8 @@
ToTimestamp,
)
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name="test_date_time")
diff --git a/test/spark/transformations/date_time/test_interval.py b/tests/spark/transformations/date_time/test_interval.py
similarity index 99%
rename from test/spark/transformations/date_time/test_interval.py
rename to tests/spark/transformations/date_time/test_interval.py
index b6581bbe..5c7d177c 100644
--- a/test/spark/transformations/date_time/test_interval.py
+++ b/tests/spark/transformations/date_time/test_interval.py
@@ -1,6 +1,7 @@
import datetime as dt
import pytest
+
from pyspark.sql import types as T
from koheesio.logger import LoggingFactory
@@ -13,6 +14,8 @@
dt_column,
)
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name="test_date_time")
diff --git a/test/spark/transformations/strings/test_change_case.py b/tests/spark/transformations/strings/test_change_case.py
similarity index 99%
rename from test/spark/transformations/strings/test_change_case.py
rename to tests/spark/transformations/strings/test_change_case.py
index 64599ca5..e745d2bc 100644
--- a/test/spark/transformations/strings/test_change_case.py
+++ b/tests/spark/transformations/strings/test_change_case.py
@@ -12,6 +12,8 @@
UpperCase,
)
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
data = [
["Banana lemon orange", 1000, "USA"],
diff --git a/test/spark/transformations/strings/test_concat.py b/tests/spark/transformations/strings/test_concat.py
similarity index 99%
rename from test/spark/transformations/strings/test_concat.py
rename to tests/spark/transformations/strings/test_concat.py
index 145c2b41..c921ba7c 100644
--- a/test/spark/transformations/strings/test_concat.py
+++ b/tests/spark/transformations/strings/test_concat.py
@@ -5,6 +5,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.strings.concat import Concat
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
diff --git a/test/spark/transformations/strings/test_pad.py b/tests/spark/transformations/strings/test_pad.py
similarity index 99%
rename from test/spark/transformations/strings/test_pad.py
rename to tests/spark/transformations/strings/test_pad.py
index aca8674e..145f4630 100644
--- a/test/spark/transformations/strings/test_pad.py
+++ b/tests/spark/transformations/strings/test_pad.py
@@ -8,6 +8,8 @@
from koheesio.models import ValidationError
from koheesio.steps.transformations.strings.pad import LPad, Pad, RPad
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
data = [[1, 2, "hello"], [3, 4, "world"], [3, 4, None]]
diff --git a/test/spark/transformations/strings/test_regexp.py b/tests/spark/transformations/strings/test_regexp.py
similarity index 99%
rename from test/spark/transformations/strings/test_regexp.py
rename to tests/spark/transformations/strings/test_regexp.py
index af4022b6..211a44eb 100644
--- a/test/spark/transformations/strings/test_regexp.py
+++ b/tests/spark/transformations/strings/test_regexp.py
@@ -7,6 +7,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.strings.regexp import RegexpExtract, RegexpReplace
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
data_year_wk = [["2020 W1"], ["2021 WK2"], ["2022WK3"]]
diff --git a/test/spark/transformations/strings/test_split.py b/tests/spark/transformations/strings/test_split.py
similarity index 99%
rename from test/spark/transformations/strings/test_split.py
rename to tests/spark/transformations/strings/test_split.py
index cd0bf261..91b31a87 100644
--- a/test/spark/transformations/strings/test_split.py
+++ b/tests/spark/transformations/strings/test_split.py
@@ -7,6 +7,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.strings.split import SplitAll, SplitAtFirstMatch
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
string_data = [
diff --git a/test/spark/transformations/strings/test_string_replace.py b/tests/spark/transformations/strings/test_string_replace.py
similarity index 98%
rename from test/spark/transformations/strings/test_string_replace.py
rename to tests/spark/transformations/strings/test_string_replace.py
index 0beeb684..d9280bb8 100644
--- a/test/spark/transformations/strings/test_string_replace.py
+++ b/tests/spark/transformations/strings/test_string_replace.py
@@ -7,6 +7,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.strings.replace import Replace
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
data_with_strings = [[1, 2, "hello"], [3, 4, "world"], [3, 4, None]]
diff --git a/test/spark/transformations/strings/test_substring.py b/tests/spark/transformations/strings/test_substring.py
similarity index 98%
rename from test/spark/transformations/strings/test_substring.py
rename to tests/spark/transformations/strings/test_substring.py
index 4db189a7..1be99f53 100644
--- a/test/spark/transformations/strings/test_substring.py
+++ b/tests/spark/transformations/strings/test_substring.py
@@ -7,6 +7,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.strings.substring import Substring
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
data_with_strings = [[1, 2, "hello"], [3, 4, "world"], [3, 4, None]]
diff --git a/test/spark/transformations/strings/test_trim.py b/tests/spark/transformations/strings/test_trim.py
similarity index 99%
rename from test/spark/transformations/strings/test_trim.py
rename to tests/spark/transformations/strings/test_trim.py
index 72325b96..742279b2 100644
--- a/test/spark/transformations/strings/test_trim.py
+++ b/tests/spark/transformations/strings/test_trim.py
@@ -3,11 +3,14 @@
"""
import pytest
+
from pydantic import ValidationError
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.strings.trim import LTrim, RTrim, Trim
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name=__file__, inherit_from_koheesio=True)
diff --git a/test/spark/transformations/test_arrays.py b/tests/spark/transformations/test_arrays.py
similarity index 99%
rename from test/spark/transformations/test_arrays.py
rename to tests/spark/transformations/test_arrays.py
index 6a71bb2f..61ae1c60 100644
--- a/test/spark/transformations/test_arrays.py
+++ b/tests/spark/transformations/test_arrays.py
@@ -1,6 +1,7 @@
import math
import pytest
+
from pyspark.sql.types import (
ArrayType,
FloatType,
@@ -25,6 +26,8 @@
ExplodeDistinct,
)
+pytestmark = pytest.mark.spark
+
input_data = [
(1, [1, 2, 2, 3, 3, 3], ["a", "b", "b", "c", "c", " ", None], [1.0, 2.0, 2.0, 3.0, 3.0, 3.0, float("nan")]),
(2, [4, 4, 5, 5, 6, 6, None], ["d", "e", "e", "f", "f", "f"], [4.0, 5.0, 5.0, 6.0, 6.0, 6.0]),
diff --git a/test/spark/transformations/test_camel_to_snake_transform.py b/tests/spark/transformations/test_camel_to_snake_transform.py
similarity index 97%
rename from test/spark/transformations/test_camel_to_snake_transform.py
rename to tests/spark/transformations/test_camel_to_snake_transform.py
index f46dfdcb..fe31b503 100644
--- a/test/spark/transformations/test_camel_to_snake_transform.py
+++ b/tests/spark/transformations/test_camel_to_snake_transform.py
@@ -1,9 +1,12 @@
import pytest
+
from pyspark.sql import functions as F
from koheesio.steps.readers.dummy import DummyReader
from koheesio.steps.transformations.camel_to_snake import CamelToSnakeTransformation
+pytestmark = pytest.mark.spark
+
class TestCamelToSnakeTransformation:
@pytest.fixture
diff --git a/test/spark/transformations/test_cast_to_datatype.py b/tests/spark/transformations/test_cast_to_datatype.py
similarity index 99%
rename from test/spark/transformations/test_cast_to_datatype.py
rename to tests/spark/transformations/test_cast_to_datatype.py
index 4447db5a..11ee7c20 100644
--- a/test/spark/transformations/test_cast_to_datatype.py
+++ b/tests/spark/transformations/test_cast_to_datatype.py
@@ -6,7 +6,9 @@
from decimal import Decimal
import pytest
+
from pydantic import ValidationError
+
from pyspark.sql import DataFrame
from pyspark.sql import functions as f
@@ -27,6 +29,8 @@
)
from koheesio.utils import SparkDatatype
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"input_values,expected",
diff --git a/test/spark/transformations/test_drop_column.py b/tests/spark/transformations/test_drop_column.py
similarity index 98%
rename from test/spark/transformations/test_drop_column.py
rename to tests/spark/transformations/test_drop_column.py
index 42ab9f7f..a0427b72 100644
--- a/test/spark/transformations/test_drop_column.py
+++ b/tests/spark/transformations/test_drop_column.py
@@ -2,6 +2,8 @@
from koheesio.steps.transformations.drop_column import DropColumn
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"input_values,expected",
diff --git a/test/spark/transformations/test_get_item.py b/tests/spark/transformations/test_get_item.py
similarity index 98%
rename from test/spark/transformations/test_get_item.py
rename to tests/spark/transformations/test_get_item.py
index abd7fc1d..cc3d632c 100644
--- a/test/spark/transformations/test_get_item.py
+++ b/tests/spark/transformations/test_get_item.py
@@ -2,6 +2,8 @@
from koheesio.steps.transformations.get_item import GetItem
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"input_values,input_data,input_schema,expected",
diff --git a/test/spark/transformations/test_hash.py b/tests/spark/transformations/test_hash.py
similarity index 99%
rename from test/spark/transformations/test_hash.py
rename to tests/spark/transformations/test_hash.py
index ef4452ae..1c0245c7 100644
--- a/test/spark/transformations/test_hash.py
+++ b/tests/spark/transformations/test_hash.py
@@ -5,6 +5,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.hash import Sha2Hash
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name="test_hash")
diff --git a/test/spark/transformations/test_lookup.py b/tests/spark/transformations/test_lookup.py
similarity index 98%
rename from test/spark/transformations/test_lookup.py
rename to tests/spark/transformations/test_lookup.py
index 4398ccd4..890d7c37 100644
--- a/test/spark/transformations/test_lookup.py
+++ b/tests/spark/transformations/test_lookup.py
@@ -1,4 +1,5 @@
import pytest
+
from pyspark.sql import SparkSession
from koheesio.steps.transformations.lookup import (
@@ -9,6 +10,8 @@
TargetColumn,
)
+pytestmark = pytest.mark.spark
+
def test_join_mapping_column_test() -> None:
assert str(JoinMapping(source_column="source", other_column="joined").column) == "Column<'joined AS source'>"
diff --git a/test/spark/transformations/test_repartition.py b/tests/spark/transformations/test_repartition.py
similarity index 98%
rename from test/spark/transformations/test_repartition.py
rename to tests/spark/transformations/test_repartition.py
index 780fee95..ab557870 100644
--- a/test/spark/transformations/test_repartition.py
+++ b/tests/spark/transformations/test_repartition.py
@@ -3,6 +3,8 @@
from koheesio.models import ValidationError
from koheesio.steps.transformations.repartition import Repartition
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"input_values,expected",
diff --git a/test/spark/transformations/test_replace.py b/tests/spark/transformations/test_replace.py
similarity index 99%
rename from test/spark/transformations/test_replace.py
rename to tests/spark/transformations/test_replace.py
index 7f54fbf4..e8ad7491 100644
--- a/test/spark/transformations/test_replace.py
+++ b/tests/spark/transformations/test_replace.py
@@ -3,6 +3,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.replace import Replace
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"input_values,expected",
diff --git a/test/spark/transformations/test_row_number_dedup.py b/tests/spark/transformations/test_row_number_dedup.py
similarity index 99%
rename from test/spark/transformations/test_row_number_dedup.py
rename to tests/spark/transformations/test_row_number_dedup.py
index b9ad86bc..aafe55c0 100644
--- a/test/spark/transformations/test_row_number_dedup.py
+++ b/tests/spark/transformations/test_row_number_dedup.py
@@ -1,12 +1,15 @@
from datetime import datetime
import pytest
+
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from koheesio.steps.transformations.rank_dedup import RankDedup
from koheesio.steps.transformations.row_number_dedup import RowNumberDedup
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize("target_column", ["col_row_nuber"])
def test_row_number_dedup(spark: SparkSession, target_column: str) -> None:
diff --git a/test/spark/transformations/test_sql_transform.py b/tests/spark/transformations/test_sql_transform.py
similarity index 99%
rename from test/spark/transformations/test_sql_transform.py
rename to tests/spark/transformations/test_sql_transform.py
index edcccdce..0b60f512 100644
--- a/test/spark/transformations/test_sql_transform.py
+++ b/tests/spark/transformations/test_sql_transform.py
@@ -6,6 +6,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.sql_transform import SqlTransform
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name="test_sql_transform")
diff --git a/test/spark/transformations/test_transform.py b/tests/spark/transformations/test_transform.py
similarity index 98%
rename from test/spark/transformations/test_transform.py
rename to tests/spark/transformations/test_transform.py
index 9233e4e5..d39e09ca 100644
--- a/test/spark/transformations/test_transform.py
+++ b/tests/spark/transformations/test_transform.py
@@ -1,11 +1,15 @@
from typing import Any, Dict
+import pytest
+
from pyspark.sql import DataFrame
from pyspark.sql import functions as f
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.transform import Transform
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name="test_transform")
diff --git a/test/spark/transformations/test_transformation.py b/tests/spark/transformations/test_transformation.py
similarity index 99%
rename from test/spark/transformations/test_transformation.py
rename to tests/spark/transformations/test_transformation.py
index 5659984c..74826764 100644
--- a/test/spark/transformations/test_transformation.py
+++ b/tests/spark/transformations/test_transformation.py
@@ -1,4 +1,5 @@
import pytest
+
from pyspark.sql import Column
from pyspark.sql import functions as f
@@ -9,6 +10,8 @@
from koheesio.steps.transformations.dummy import DummyTransformation
from koheesio.utils import SparkDatatype
+pytestmark = pytest.mark.spark
+
def test_transform(dummy_df):
tf = DummyTransformation()
diff --git a/test/spark/transformations/test_uuid5.py b/tests/spark/transformations/test_uuid5.py
similarity index 98%
rename from test/spark/transformations/test_uuid5.py
rename to tests/spark/transformations/test_uuid5.py
index 2154f90b..3d864b75 100644
--- a/test/spark/transformations/test_uuid5.py
+++ b/tests/spark/transformations/test_uuid5.py
@@ -3,6 +3,8 @@
from koheesio.logger import LoggingFactory
from koheesio.steps.transformations.uuid5 import HashUUID5
+pytestmark = pytest.mark.spark
+
log = LoggingFactory.get_logger(name="test_uuid5")
diff --git a/test/spark/writers/delta/test_delta_writer.py b/tests/spark/writers/delta/test_delta_writer.py
similarity index 99%
rename from test/spark/writers/delta/test_delta_writer.py
rename to tests/spark/writers/delta/test_delta_writer.py
index c22ca3fc..576e9e05 100644
--- a/test/spark/writers/delta/test_delta_writer.py
+++ b/tests/spark/writers/delta/test_delta_writer.py
@@ -4,7 +4,9 @@
import pytest
from conftest import await_job_completion
from delta import DeltaTable
+
from pydantic import ValidationError
+
from pyspark.sql import functions as F
from koheesio.steps.delta import DeltaTableStep
@@ -14,6 +16,8 @@
from koheesio.steps.writers.delta.utils import log_clauses
from koheesio.steps.writers.stream import Trigger
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"output_mode,expected_count",
diff --git a/test/spark/writers/delta/test_scd.py b/tests/spark/writers/delta/test_scd.py
similarity index 99%
rename from test/spark/writers/delta/test_scd.py
rename to tests/spark/writers/delta/test_scd.py
index 72ef9fa7..7a79e45b 100644
--- a/test/spark/writers/delta/test_scd.py
+++ b/tests/spark/writers/delta/test_scd.py
@@ -1,9 +1,12 @@
import datetime
from typing import List, Optional
+import pytest
from delta import DeltaTable
from delta.tables import DeltaMergeBuilder
+
from pydantic import Field
+
from pyspark.sql import Column
from pyspark.sql import functions as F
from pyspark.sql.types import Row
@@ -12,6 +15,8 @@
from koheesio.steps.spark import DataFrame, current_timestamp_utc
from koheesio.steps.writers.delta.scd import SCD2DeltaTableWriter
+pytestmark = pytest.mark.spark
+
def test_scd2_custom_logic(spark):
def _get_result(target_df: DataFrame, expr: str):
diff --git a/test/spark/writers/test_buffer.py b/tests/spark/writers/test_buffer.py
similarity index 99%
rename from test/spark/writers/test_buffer.py
rename to tests/spark/writers/test_buffer.py
index 51e87f3a..62a25167 100644
--- a/test/spark/writers/test_buffer.py
+++ b/tests/spark/writers/test_buffer.py
@@ -3,6 +3,7 @@
from importlib.util import find_spec
import pytest
+
from pyspark.sql.types import (
BooleanType,
FloatType,
@@ -14,6 +15,8 @@
from koheesio.steps.writers.buffer import PandasCsvBufferWriter, PandasJsonBufferWriter
+pytestmark = pytest.mark.spark
+
# Test data has two columns: email and sha256_email
test_data = [
(
diff --git a/test/spark/writers/test_dummy.py b/tests/spark/writers/test_dummy.py
similarity index 94%
rename from test/spark/writers/test_dummy.py
rename to tests/spark/writers/test_dummy.py
index 831b6372..69f9ea63 100644
--- a/test/spark/writers/test_dummy.py
+++ b/tests/spark/writers/test_dummy.py
@@ -2,6 +2,8 @@
from koheesio.steps.writers.dummy import DummyWriter
+pytestmark = pytest.mark.spark
+
expected = {"id": 0}
diff --git a/test/spark/writers/test_sftp.py b/tests/spark/writers/test_sftp.py
similarity index 99%
rename from test/spark/writers/test_sftp.py
rename to tests/spark/writers/test_sftp.py
index 685236fc..c0e454e7 100644
--- a/test/spark/writers/test_sftp.py
+++ b/tests/spark/writers/test_sftp.py
@@ -12,6 +12,8 @@
SFTPWriter,
)
+pytestmark = pytest.mark.spark
+
@pytest.fixture
def buffer_writer(spark):
diff --git a/test/spark/writers/test_stream.py b/tests/spark/writers/test_stream.py
similarity index 98%
rename from test/spark/writers/test_stream.py
rename to tests/spark/writers/test_stream.py
index 419f3dae..dc125b7c 100644
--- a/test/spark/writers/test_stream.py
+++ b/tests/spark/writers/test_stream.py
@@ -4,6 +4,8 @@
from koheesio.steps.writers.stream import Trigger
+pytestmark = pytest.mark.spark
+
@pytest.mark.parametrize(
"args, expected",
diff --git a/test/steps/asyncio/test_asyncio_http.py b/tests/steps/asyncio/test_asyncio_http.py
similarity index 99%
rename from test/steps/asyncio/test_asyncio_http.py
rename to tests/steps/asyncio/test_asyncio_http.py
index 09ff0f78..acf4a76f 100644
--- a/test/steps/asyncio/test_asyncio_http.py
+++ b/tests/steps/asyncio/test_asyncio_http.py
@@ -3,9 +3,10 @@
import pytest
from aiohttp import ClientResponseError, ClientSession, TCPConnector
from aiohttp_retry import ExponentialRetry
-from pydantic import ValidationError
from yarl import URL
+from pydantic import ValidationError
+
from koheesio.steps.asyncio.http import AsyncHttpStep
from koheesio.steps.http import HttpMethod
diff --git a/test/steps/integrations/notifications/test_slack.py b/tests/steps/integrations/notifications/test_slack.py
similarity index 100%
rename from test/steps/integrations/notifications/test_slack.py
rename to tests/steps/integrations/notifications/test_slack.py
diff --git a/test/steps/integrations/sso/test_okta.py b/tests/steps/integrations/sso/test_okta.py
similarity index 97%
rename from test/steps/integrations/sso/test_okta.py
rename to tests/steps/integrations/sso/test_okta.py
index 7b31373f..09a5a608 100644
--- a/test/steps/integrations/sso/test_okta.py
+++ b/tests/steps/integrations/sso/test_okta.py
@@ -2,9 +2,10 @@
from io import StringIO
import pytest
-from pydantic import SecretStr
from requests_mock.mocker import Mocker
+from pydantic import SecretStr
+
from koheesio.steps.integrations.sso import okta as o
@@ -45,7 +46,7 @@ def test_log_extra_params_secret(self):
log_capture_string = StringIO()
ch = logging.StreamHandler(log_capture_string)
ch.setLevel(logging.DEBUG)
- logger = logging.getLogger("test")
+ logger = logging.getLogger("tests")
logger.addHandler(ch)
secret_val = "secret_value"
oat = o.OktaAccessToken(
diff --git a/test/steps/test_http.py b/tests/steps/test_http.py
similarity index 100%
rename from test/steps/test_http.py
rename to tests/steps/test_http.py
diff --git a/test/utils/test_utils.py b/tests/utils/test_utils.py
similarity index 99%
rename from test/utils/test_utils.py
rename to tests/utils/test_utils.py
index 22919506..9b2acbaa 100644
--- a/test/utils/test_utils.py
+++ b/tests/utils/test_utils.py
@@ -2,6 +2,7 @@
from unittest.mock import patch
import pytest
+
from pyspark.sql.types import StringType, StructField, StructType
from koheesio.utils import (