Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added categorical encoding function #127

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies = [
"pytz>=2023.3",
"pyyaml>=6.0",
"tomli>=2.0.1",
"pandas>=1.5.0",
"scikit-learn>=1.2.0"
Comment on lines +45 to +46
Copy link
Member

@dannymeijer dannymeijer Dec 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to make these top level dependencies + we already have pandas as an extra dependency.
Let's make an extra called "ml" and put scikit-learn in there. That way you can install the extra dependency as koheesio[pandas,ml]

]

[project.urls]
Expand Down
57 changes: 57 additions & 0 deletions src/koheesio/pandas/categorical_encoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from sklearn.preprocessing import OneHotEncoder
import pandas as pd
from pydantic import BaseModel
from typing import List, Optional
from koheesio.steps import Step

from typing import List, Dict
import pandas as pd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import pandas through the koheesio module (as stated above) to avoid conflict:
from koheesio.pandas import pandas as pd

Please run isort (through make fmt or hatch fmt, or run ruff)

from pydantic import BaseModel

class PandasCategoricalEncoding(BaseModel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this class a PandasStep : from koheesio.pandas import PandasStep

"""
Encodes categorical columns using one-hot encoding or ordinal encoding.

Attributes
----------
columns : List[str]
List of categorical columns to encode.
encoding_type : str, optional
Type of encoding, either "one-hot" or "ordinal" (default is "one-hot").
drop_first : bool, optional
Whether to drop the first dummy column for one-hot encoding (default is True).
ordinal_mapping : dict, optional
A dictionary mapping categorical values to integers for ordinal encoding.
"""

columns: List[str]
encoding_type: str = "one-hot"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the type to Literal["one-hot", "ordinal"], that way you don't need the extra check you put in the __init__ method

drop_first: bool = True
ordinal_mapping: Dict[str, Dict] = None
Comment on lines +27 to +30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make all these Fields, example:

from koheesio.models import Field
...
class PandasCategoricalEncoding(PandasStep):

    columns: List[str] = Field(..., description="...")
    encoding_type: Literal["one-hot", "ordinal"] = Field(default="one-hot", description="...")
    ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and of course add appropriate description to each)


def __init__(self, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This becomes obsolete if you make the type a Literal as stated above

super().__init__(**kwargs)
if self.encoding_type not in ['one-hot', 'ordinal']:
raise ValueError(f"Invalid encoding type: {self.encoding_type}. Expected 'one-hot' or 'ordinal'.")

def execute(self, data: pd.DataFrame) -> pd.DataFrame:
"""
Executes the categorical encoding transformation on the provided dataset.

Parameters
----------
data : pd.DataFrame
The input dataset to encode.

Returns
-------
pd.DataFrame
The dataset with the specified categorical columns encoded.
"""
if self.encoding_type == 'one-hot':
data = pd.get_dummies(data, columns=self.columns, drop_first=self.drop_first)
elif self.encoding_type == 'ordinal':
for column in self.columns:
if column in data.columns and self.ordinal_mapping and column in self.ordinal_mapping:
data[column] = data[column].map(self.ordinal_mapping[column]).fillna(-1).astype(int)
return data
Comment on lines +37 to +57
Copy link
Member

@dannymeijer dannymeijer Dec 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few things about execute (for when you change to Step:

  • execute takes no arguments, instead add the DataFrame as one of the input Fields; add something like this df: Optional[pd.DataFrame] = Field(default=None, description="...") - I will explain why you want this as Optional in a bit
  • execute is expected to deal with input (your Fields) and generate Output
  • this Output does not need to be returned explicitly, the Step parent-class takes care of this.
  • instead, add a .transform method that can take a DataFrame as input

This means you can change your code like this:

  1. add an Output class
  2. add a .transform method
  3. update your execute method accordingly

Should look something like this (of course add docstrings and things like that):

class PandasCategoricalEncoding(PandasStep):
    ...
    class Output(PandasStep.Output):
        df: pd.DataFrame = Field(..., description="output pandas DataFrame"
    
    def transform(self, df: Optional[pd.DataFrame]):
        self.df = df or self.df
        if not self.df:
            raise RuntimeError("No valid Dataframe was passed")
        self.execute()
        return self.output.df

    def execute(self) -> Output:
        if self.encoding_type == 'one-hot':
            self.output.df = pd.get_dummies(self.df, columns=self.columns, drop_first=self.drop_first)
        elif self.encoding_type == 'ordinal':
            data = self.df
            for column in self.columns:
               for column in self.columns:
                if column in data.columns and self.ordinal_mapping and column in self.ordinal_mapping:
                    data[column] = data[column].map(self.ordinal_mapping[column]).fillna(-1).astype(int)
            self.output.df = data

This way you can interact with your class this way:

encoding_step = PandasCategoricalEncoding(
    columns=["color"],
    encoding_type="one-hot",
    drop_first=False  # Adjusted to match expected columns
)
encoded_data = encoding_step.transform(self.data)

This way this interface matches what we do for Spark.
Note: I will work on making Transformation base classes for Pandas in a separate PR.

For reference:

class Transformation(SparkStep, ABC):
"""Base class for all transformations
Concept
-------
A Transformation is a Step that takes a DataFrame as input and returns a DataFrame as output. The DataFrame is
transformed based on the logic implemented in the `execute` method. Any additional parameters that are needed for
the transformation can be passed to the constructor.
Parameters
----------
df : Optional[DataFrame]
The DataFrame to apply the transformation to. If not provided, the DataFrame has to be passed to the
transform-method.
Example
-------
### Implementing a transformation using the Transformation class:
```python
from koheesio.steps.transformations import Transformation
from pyspark.sql import functions as f
class AddOne(Transformation):
target_column: str = "new_column"
def execute(self):
self.output.df = self.df.withColumn(
self.target_column, f.col("old_column") + 1
)
```
In the example above, the `execute` method is implemented to add 1 to the values of the `old_column` and store the
result in a new column called `new_column`.
### Using the transformation:
In order to use this transformation, we can call the `transform` method:
```python
from pyspark.sql import SparkSession
# create a DataFrame with 3 rows
df = SparkSession.builder.getOrCreate().range(3)
output_df = AddOne().transform(df)
```
The `output_df` will now contain the original DataFrame with an additional column called `new_column` with the
values of `old_column` + 1.
__output_df:__
|id|new_column|
|--|----------|
| 0| 1|
| 1| 2|
| 2| 3|
...
### Alternative ways to use the transformation:
Alternatively, we can pass the DataFrame to the constructor and call the `execute` or `transform` method without
any arguments:
```python
output_df = AddOne(df).transform()
# or
output_df = AddOne(df).execute().output.df
```
> Note: that the transform method was not implemented explicitly in the AddOne class. This is because the `transform`
method is already implemented in the `Transformation` class. This means that all classes that inherit from the
Transformation class will have the `transform` method available. Only the execute method needs to be implemented.
### Using the transformation as a function:
The transformation can also be used as a function as part of a DataFrame's `transform` method:
```python
input_df = spark.range(3)
output_df = input_df.transform(AddOne(target_column="foo")).transform(
AddOne(target_column="bar")
)
```
In the above example, the `AddOne` transformation is applied to the `input_df` DataFrame using the `transform`
method. The `output_df` will now contain the original DataFrame with an additional columns called `foo` and
`bar', each with the values of `id` + 1.
"""
df: Optional[DataFrame] = Field(default=None, description="The Spark DataFrame")
@abstractmethod
def execute(self) -> SparkStep.Output:
"""Execute on a Transformation should handle self.df (input) and set self.output.df (output)
This method should be implemented in the child class. The input DataFrame is available as `self.df` and the
output DataFrame should be stored in `self.output.df`.
For example:
```python
def execute(self):
self.output.df = self.df.withColumn(
"new_column", f.col("old_column") + 1
)
```
The transform method will call this method and return the output DataFrame.
"""
# self.df # input dataframe
# self.output.df # output dataframe
self.output.df = ... # implement the transformation logic
raise NotImplementedError
def transform(self, df: Optional[DataFrame] = None) -> DataFrame:
"""Execute the transformation and return the output DataFrame
Note: when creating a child from this, don't implement this transform method. Instead, implement execute!
See Also
--------
`Transformation.execute`
Parameters
----------
df: Optional[DataFrame]
The DataFrame to apply the transformation to. If not provided, the DataFrame passed to the constructor
will be used.
Returns
-------
DataFrame
The transformed DataFrame
"""
self.df = df or self.df
if not self.df:
raise RuntimeError("No valid Dataframe was passed")
self.execute()
return self.output.df
def __call__(self, *args, **kwargs):
"""Allow the class to be called as a function.
This is especially useful when using a DataFrame's transform method.
Example
-------
```python
input_df = spark.range(3)
output_df = input_df.transform(AddOne(target_column="foo")).transform(
AddOne(target_column="bar")
)
```
In the above example, the `AddOne` transformation is applied to the `input_df` DataFrame using the `transform`
method. The `output_df` will now contain the original DataFrame with an additional columns called `foo` and
`bar', each with the values of `id` + 1.
"""
return self.transform(*args, **kwargs)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making df an Optional type allows us to either give the df as an argument when initializing the class, or pass it through transform - this is exactly how we do it inside the Spark module at the moment

81 changes: 81 additions & 0 deletions tests/test_categorical_encoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import unittest
import pandas as pd
from src.koheesio.pandas.categorical_encoding import PandasCategoricalEncoding
import unittest
import pandas as pd
Comment on lines +2 to +5
Copy link
Member

@dannymeijer dannymeijer Dec 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're importing pandas twice. Also, import pandas through the koheesio module to avoid conflict:
from koheesio.pandas import pandas as pd


class TestPandasCategoricalEncoding(unittest.TestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use unittest - use pytest.

  1. get rid of the unittest.TestCase (just let it be a regular class)
  2. change your self.assert... (from unittest) to regular python assert (this is how pytest works)
  3. get rid of your setUp - just make the input dataframe a module level variable, OR make it a fixture (a bit overkill for your purpose here)
  4. change your code to match the interface I proposed above


"""
Unit tests for the PandasCategoricalEncoding class.

Methods
-------
setUp():
Sets up the test environment with sample data.
test_one_hot_encoding():
Tests the one-hot encoding functionality.
test_ordinal_encoding():
Tests the ordinal encoding functionality.
"""

def setUp(self):
"""
Sets up the test environment with sample data.

Creates a pandas DataFrame with categorical columns for testing.
"""
self.data = pd.DataFrame({
'color': ['red', 'blue', 'green', 'blue', 'red'],
'size': ['S', 'M', 'L', 'S', 'XL']
})

def test_one_hot_encoding(self):
"""
Tests one-hot encoding functionality.

Ensures that the specified categorical columns are correctly
encoded into dummy variables, with an option to drop the first
dummy column.
"""
encoding_step = PandasCategoricalEncoding(
columns=["color"],
encoding_type="one-hot",
drop_first=False # Adjusted to match expected columns
)

# Apply encoding
encoded_data = encoding_step.execute(self.data)

# Expected columns after one-hot encoding
expected_columns = ['color_blue', 'color_green', 'color_red']

# Check if the encoded data contains expected columns
self.assertTrue(all(col in encoded_data.columns for col in expected_columns))
print("One-hot encoding passed.")

def test_ordinal_encoding(self):
"""
Tests ordinal encoding functionality.

Ensures that the specified categorical columns are correctly
encoded into ordinal values based on a provided mapping.
"""

ordinal_mapping = {
'size': {'S': 1, 'M': 2, 'L': 3, 'XL': 4}
}

encoding_step = PandasCategoricalEncoding(
columns=["size"],
encoding_type="ordinal",
ordinal_mapping=ordinal_mapping
)

# Apply encoding
encoded_data = encoding_step.execute(self.data)

# Check if the "size" column is correctly ordinal encoded
expected_values = [1, 2, 3, 1, 4]
self.assertTrue(all(encoded_data['size'] == expected_values))
print("Ordinal encoding passed.")