diff --git a/docs/references/orchestration/prefect/flows.md b/docs/references/orchestration/prefect/flows.md index e1c291f72..0ccc75105 100644 --- a/docs/references/orchestration/prefect/flows.md +++ b/docs/references/orchestration/prefect/flows.md @@ -43,3 +43,5 @@ ::: viadot.orchestration.prefect.flows.sql_server_to_minio ::: viadot.orchestration.prefect.flows.sql_server_to_parquet + +::: viadot.orchestration.prefect.flows.azure_sql_to_adls diff --git a/docs/references/orchestration/prefect/tasks.md b/docs/references/orchestration/prefect/tasks.md index 67e6cccdb..002c8852f 100644 --- a/docs/references/orchestration/prefect/tasks.md +++ b/docs/references/orchestration/prefect/tasks.md @@ -43,3 +43,5 @@ ::: viadot.orchestration.prefect.tasks.sql_server_query ::: viadot.orchestration.prefect.tasks.sql_server_to_df + +::: viadot.orchestration.prefect.tasks.azure_sql_to_df diff --git a/docs/references/sources/sql.md b/docs/references/sources/sql.md index 80b107ef3..114bf97a2 100644 --- a/docs/references/sources/sql.md +++ b/docs/references/sources/sql.md @@ -23,3 +23,5 @@ ::: viadot.sources.sap_rfc.SAPRFC ::: viadot.sources.sap_rfc.SAPRFCV2 + +::: viadot.sources.azure_sql.AzureSQL diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index a2d489ab9..fce71f78f 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -1,5 +1,6 @@ """Import flows.""" +from .azure_sql_to_adls import azure_sql_to_adls from .bigquery_to_adls import bigquery_to_adls from .cloud_for_customers_to_adls import cloud_for_customers_to_adls from .cloud_for_customers_to_databricks import cloud_for_customers_to_databricks @@ -34,6 +35,7 @@ __all__ = [ + "azure_sql_to_adls", "bigquery_to_adls", "cloud_for_customers_to_adls", "cloud_for_customers_to_databricks", diff --git a/src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py b/src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py new file mode 100644 index 000000000..591ffe3e8 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py @@ -0,0 +1,77 @@ +"""Flows for downloading data from Azure SQL and uploading it to Azure ADLS.""" + +from typing import Any + +from prefect import flow +from prefect.task_runners import ConcurrentTaskRunner + +from viadot.orchestration.prefect.tasks import azure_sql_to_df, df_to_adls + + +@flow( + name="Azure SQL extraction to ADLS", + description="Extract data from Azure SQL" + + " and load it into Azure Data Lake Storage.", + retries=1, + retry_delay_seconds=60, + task_runner=ConcurrentTaskRunner, + log_prints=True, +) +def azure_sql_to_adls( + query: str | None = None, + credentials_secret: str | None = None, + validate_df_dict: dict[str, Any] | None = None, + convert_bytes: bool = False, + remove_special_characters: bool | None = None, + columns_to_clean: list[str] | None = None, + adls_config_key: str | None = None, + adls_azure_key_vault_secret: str | None = None, + adls_path: str | None = None, + adls_path_overwrite: bool = False, +) -> None: + r"""Download data from Azure SQL to a CSV file and uploading it to ADLS. + + Args: + query (str): Query to perform on a database. Defaults to None. + credentials_secret (str, optional): The name of the Azure Key Vault + secret containing a dictionary with database credentials. + Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of + tests to verify the output dataframe. If defined, triggers the `validate_df` + task from task_utils. Defaults to None. + convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int + It is used to convert bytes data type into int, as pulling data with bytes + can lead to malformed data in data frame. + Defaults to False. + remove_special_characters (str, optional): Call a function that remove + special characters like escape symbols. Defaults to None. + columns_to_clean (List(str), optional): Select columns to clean, used with + remove_special_characters. If None whole data frame will be processed. + Defaults to None. + adls_config_key (Optional[str], optional): The key in the viadot config holding + relevant credentials. Defaults to None. + adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key + Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal + credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + adls_path (Optional[str], optional): Azure Data Lake destination file path (with + file name). Defaults to None. + adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS. + Defaults to True. + """ + data_frame = azure_sql_to_df( + query=query, + credentials_secret=credentials_secret, + validate_df_dict=validate_df_dict, + convert_bytes=convert_bytes, + remove_special_characters=remove_special_characters, + columns_to_clean=columns_to_clean, + ) + + return df_to_adls( + df=data_frame, + path=adls_path, + credentials_secret=adls_azure_key_vault_secret, + config_key=adls_config_key, + overwrite=adls_path_overwrite, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index db43224a7..7250c17f4 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -1,6 +1,7 @@ """Imports.""" from .adls import adls_upload, df_to_adls +from .azure_sql import azure_sql_to_df from .bcp import bcp from .bigquery import bigquery_to_df from .cloud_for_customers import cloud_for_customers_to_df @@ -31,6 +32,7 @@ __all__ = [ + "azure_sql_to_df", "adls_upload", "bcp", "clone_repo", diff --git a/src/viadot/orchestration/prefect/tasks/azure_sql.py b/src/viadot/orchestration/prefect/tasks/azure_sql.py new file mode 100644 index 000000000..f065b680d --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/azure_sql.py @@ -0,0 +1,70 @@ +"""Task for downloading data from Azure SQL.""" + +from typing import Any, Literal + +import pandas as pd +from prefect import task + +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources import AzureSQL +from viadot.utils import validate + + +@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60) +def azure_sql_to_df( + query: str | None = None, + credentials_secret: str | None = None, + validate_df_dict: dict[str, Any] | None = None, + convert_bytes: bool = False, + remove_special_characters: bool | None = None, + columns_to_clean: list[str] | None = None, + if_empty: Literal["warn", "skip", "fail"] = "warn", +) -> pd.DataFrame: + r"""Task to download data from Azure SQL. + + Args: + query (str): Query to perform on a database. Defaults to None. + credentials_secret (str, optional): The name of the Azure Key Vault + secret containing a dictionary with database credentials. + Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of + tests to verify the output dataframe. If defined, triggers the `validate_df` + task from task_utils. Defaults to None. + convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int + It is used to convert bytes data type into int, as pulling data with bytes + can lead to malformed data in data frame. + Defaults to False. + remove_special_characters (str, optional): Call a function that remove + special characters like escape symbols. Defaults to None. + columns_to_clean (List(str), optional): Select columns to clean, used with + remove_special_characters. If None whole data frame will be processed. + Defaults to None. + if_empty (Literal["warn", "skip", "fail"], optional): What to do if the + query returns no data. Defaults to None. + + Raises: + ValueError: Raising ValueError if credentials_secret is not provided + + Returns: + pd.DataFrame: The response data as a pandas DataFrame. + """ + if not credentials_secret: + msg = "`credentials_secret` has to be specified and not empty." + raise ValueError(msg) + + credentials = get_credentials(credentials_secret) + + azure_sql = AzureSQL(credentials=credentials) + + df = azure_sql.to_df( + query=query, + if_empty=if_empty, + convert_bytes=convert_bytes, + remove_special_characters=remove_special_characters, + columns_to_clean=columns_to_clean, + ) + + if validate_df_dict is not None: + validate(df=df, tests=validate_df_dict) + + return df diff --git a/src/viadot/orchestration/prefect/tasks/task_utils.py b/src/viadot/orchestration/prefect/tasks/task_utils.py index e6f958311..4f4b2a6d6 100644 --- a/src/viadot/orchestration/prefect/tasks/task_utils.py +++ b/src/viadot/orchestration/prefect/tasks/task_utils.py @@ -19,8 +19,8 @@ def dtypes_to_json_task(dtypes_dict: dict[str, Any], local_json_path: str) -> No dtypes_dict (dict): Dictionary containing data types. local_json_path (str): Path to local json file. """ - with Path(local_json_path).open("w") as fp: - json.dump(dtypes_dict, fp) + with Path(local_json_path).open("w") as file_path: + json.dump(dtypes_dict, file_path) @task @@ -59,7 +59,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict: "Categorical": "VARCHAR(500)", "Time": "TIME", "Boolean": "VARCHAR(5)", # Bool is True/False, Microsoft expects 0/1 - "DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is the only timezone-aware dtype in TSQL + "DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is timezone-aware dtype in TSQL "Object": "VARCHAR(500)", "EmailAddress": "VARCHAR(50)", "File": None, @@ -73,7 +73,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict: "String": "VARCHAR(500)", "IPAddress": "VARCHAR(39)", "Path": "VARCHAR(255)", - "TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg. '1 days 11:00:00' + "TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg.'1 days 11:00:00' "URL": "VARCHAR(255)", "Count": "INT", } @@ -209,36 +209,3 @@ def union_dfs_task(dfs: list[pd.DataFrame]) -> pd.DataFrame: different size of DataFrames NaN values can appear. """ return pd.concat(dfs, ignore_index=True) - - -@task -def df_clean_column( - df: pd.DataFrame, columns_to_clean: list[str] | None = None -) -> pd.DataFrame: - """Remove special characters from a pandas DataFrame. - - Args: - df (pd.DataFrame): The DataFrame to clean. - columns_to_clean (List[str]): A list of columns to clean. Defaults is None. - - Returns: - pd.DataFrame: The cleaned DataFrame - """ - df = df.copy() - - if columns_to_clean is None: - df.replace( - to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], - value=["", ""], - regex=True, - inplace=True, - ) - else: - for col in columns_to_clean: - df[col].replace( - to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], - value=["", ""], - regex=True, - inplace=True, - ) - return df diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 08615a9f3..c8501d90d 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -4,6 +4,7 @@ from ._duckdb import DuckDB from ._trino import Trino +from .azure_sql import AzureSQL from .bigquery import BigQuery from .cloud_for_customers import CloudForCustomers from .customer_gauge import CustomerGauge @@ -24,6 +25,7 @@ __all__ = [ + "AzureSQL", "BigQuery", "CloudForCustomers", "CustomerGauge", diff --git a/src/viadot/sources/azure_sql.py b/src/viadot/sources/azure_sql.py new file mode 100644 index 000000000..e6570b83a --- /dev/null +++ b/src/viadot/sources/azure_sql.py @@ -0,0 +1,165 @@ +"""A connector for Azure SQL Database.""" + +import logging +from typing import Literal + +import pandas as pd + +from viadot.utils import df_clean_column, df_converts_bytes_to_int + +from .sql_server import SQLServer + + +logger = logging.getLogger(__name__) + + +class AzureSQL(SQLServer): + """Azure SQL connector class.""" + + def __init__(self, *args, config_key: str = "AZURE_SQL", **kwargs): + """Initialize the AzureSQL connector. + + This constructor sets up the Azure SQL connector with the specified + configuration key. It allows for additional positional and keyword arguments + to be passed to the parent SQLServer class. + + Args: + *args: Variable length argument list passed to the parent class. + config_key (str, optional): The configuration key used to retrieve + connection settings. Defaults to "AZURE_SQL". + **kwargs: Additional keyword arguments passed to the parent class. + """ + super().__init__(*args, config_key=config_key, **kwargs) + + def bulk_insert( + self, + table: str, + schema: str | None = None, + source_path: str | None = None, + sep: str | None = "\t", + if_exists: Literal["append", "replace"] = "append", + ) -> bool: + r"""Function to bulk insert. + + Args: + table (str): Table name. + schema (str, optional): Schema name. Defaults to None. + source_path (str, optional): Full path to a data file. Defaults to one. + sep (str, optional): field terminator to be used for char and + widechar data files. Defaults to "\t". + if_exists (Literal["append", "replace"] , optional): What to do if the table + already exists. Defaults to "append". + """ + if schema is None: + schema = self.DEFAULT_SCHEMA + fqn = f"{schema}.{table}" + insert_sql = f""" + BULK INSERT {fqn} FROM '{source_path}' + WITH ( + CHECK_CONSTRAINTS, + DATA_SOURCE='{self.credentials['data_source']}', + DATAFILETYPE='char', + FIELDTERMINATOR='{sep}', + ROWTERMINATOR='0x0a', + FIRSTROW=2, + KEEPIDENTITY, + TABLOCK, + CODEPAGE='65001' + ); + """ + if if_exists == "replace": + self.run(f"DELETE FROM {schema}.{table}") # noqa: S608 + self.run(insert_sql) + return True + + def create_external_database( + self, + external_database_name: str, + storage_account_name: str, + container_name: str, + sas_token: str, + master_key_password: str, + credential_name: str | None = None, + ) -> None: + """Create an external database. + + Used to eg. execute BULK INSERT or OPENROWSET queries. + + Args: + external_database_name (str): The name of the extrnal source (db) + to be created. + storage_account_name (str): The name of the Azure storage account. + container_name (str): The name of the container which should + become the "database". + sas_token (str): The SAS token to be used as the credential. + Note that the auth system in Azure is pretty broken and you might need + to paste here your storage account's account key instead. + master_key_password (str): The password for the database master key of your + Azure SQL Database. + credential_name (str): How to name the SAS credential. This is really + an Azure internal thing and can be anything. + By default '{external_database_name}_credential`. + """ + # stupid Microsoft thing + if sas_token.startswith("?"): + sas_token = sas_token[1:] + + if credential_name is None: + credential_name = f"{external_database_name}_credential" + + create_master_key_sql = ( + f"CREATE MASTER KEY ENCRYPTION BY PASSWORD = {master_key_password}" + ) + + create_external_db_credential_sql = f""" + CREATE DATABASE SCOPED CREDENTIAL {credential_name} + WITH IDENTITY = 'SHARED ACCESS SIGNATURE' + SECRET = '{sas_token}'; + """ + + create_external_db_sql = f""" + CREATE EXTERNAL DATA SOURCE {external_database_name} WITH ( + LOCATION = f'https://{storage_account_name}.blob.core.windows.net/' \ + f'{container_name}', + CREDENTIAL = {credential_name} + ); + """ + + self.run(create_master_key_sql) + self.run(create_external_db_credential_sql) + self.run(create_external_db_sql) + + def to_df( + self, + query: str, + if_empty: Literal["warn", "skip", "fail"] = "warn", + convert_bytes: bool = False, + remove_special_characters: bool | None = None, + columns_to_clean: list[str] | None = None, + ) -> pd.DataFrame: + """Execute a query and return the result as a pandas DataFrame. + + Args: + query (str): The query to execute. + con (pyodbc.Connection, optional): The connection to use to pull the data. + if_empty (Literal["warn", "skip", "fail"], optional): What to do if the + query returns no data. Defaults to None. + convert_bytes (bool). A boolean value to trigger method + df_converts_bytes_to_int. It is used to convert bytes data type into + int, as pulling data with bytes can lead to malformed data in dataframe. + Defaults to False. + remove_special_characters (str, optional): Call a function that remove + special characters like escape symbols. Defaults to None. + columns_to_clean (List(str), optional): Select columns to clean, used with + remove_special_characters. If None whole data frame will be processed. + Defaults to None. + """ + df = super().to_df(query=query, if_empty=if_empty) + + if convert_bytes: + df = df_converts_bytes_to_int(df=df) + + if remove_special_characters: + df = df_clean_column(df=df, columns_to_clean=columns_to_clean) + + return df diff --git a/src/viadot/utils.py b/src/viadot/utils.py index a7916ae92..ec581d9cd 100644 --- a/src/viadot/utils.py +++ b/src/viadot/utils.py @@ -962,3 +962,47 @@ def anonymize_df( df.drop(columns=["temp_date_col"], inplace=True, errors="ignore") return df + + +def df_converts_bytes_to_int(df: pd.DataFrame) -> pd.DataFrame: + """Task to convert bytes values to int. + + Args: + df (pd.DataFrame): Data Frame to convert + + Returns: + pd.DataFrame: Data Frame after convert + """ + return df.map(lambda x: int(x) if isinstance(x, bytes) else x) + + +def df_clean_column( + df: pd.DataFrame, columns_to_clean: list[str] | None = None +) -> pd.DataFrame: + """Remove special characters from a pandas DataFrame. + + Args: + df (pd.DataFrame): The DataFrame to clean. + columns_to_clean (List[str]): A list of columns to clean. Defaults is None. + + Returns: + pd.DataFrame: The cleaned DataFrame + """ + df = df.copy() + + if columns_to_clean is None: + df.replace( + to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], + value=["", ""], + regex=True, + inplace=True, + ) + else: + for col in columns_to_clean: + df[col].replace( + to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], + value=["", ""], + regex=True, + inplace=True, + ) + return df diff --git a/tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py b/tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py new file mode 100644 index 000000000..44f5c78bb --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_azure_sql_to_adls.py @@ -0,0 +1,88 @@ +from unittest.mock import patch + +import pandas as pd +import pytest + +from viadot.orchestration.prefect.flows import azure_sql_to_adls +from viadot.sources.azure_data_lake import AzureDataLake + + +@pytest.fixture +def query(): + return "SELECT * FROM your_table_name" + + +@pytest.fixture +def TEST_FILE_PATH(): + return "test_file_path" + + +@pytest.fixture +def adls_credentials_secret(): + return "mock_adls_credentials_secret" + + +@pytest.fixture +def azure_sql_credentials_secret(): + return "mock_azure_sql_credentials_secret" + + +def test_azure_sql_to_adls( + query, + TEST_FILE_PATH, + adls_credentials_secret, + azure_sql_credentials_secret, +): + lake = AzureDataLake(config_key="adls_test") + + # Ensure the file does not exist before the test + assert not lake.exists(TEST_FILE_PATH) + + with ( + patch( + "viadot.orchestration.prefect.tasks.azure_sql_to_df" + ) as mock_azure_sql_to_df, + patch("viadot.orchestration.prefect.tasks.df_to_adls") as mock_df_to_adls, + ): + mock_df = pd.DataFrame({"column1": [1, 2], "column2": [3, 4]}) + mock_azure_sql_to_df.return_value = mock_df + + # Call the flow + azure_sql_to_adls( + query=query, + credentials_secret=azure_sql_credentials_secret, + validate_df_dict=None, + convert_bytes=False, + remove_special_characters=None, + columns_to_clean=None, + adls_config_key="adls_test", + adls_azure_key_vault_secret=adls_credentials_secret, + adls_path=TEST_FILE_PATH, + adls_path_overwrite=True, + ) + + # Assert that the azure_sql_to_df task was called with the correct arguments + mock_azure_sql_to_df.assert_called_once_with( + query=query, + credentials_secret=azure_sql_credentials_secret, + sep=",", + file_path=TEST_FILE_PATH, + if_exists="replace", + validate_df_dict=None, + convert_bytes=False, + remove_special_characters=None, + columns_to_clean=None, + ) + + # Assert that df_to_adls was called with the correct arguments + mock_df_to_adls.assert_called_once_with( + df=mock_df, + path=TEST_FILE_PATH, + credentials_secret=adls_credentials_secret, + config_key="adls_test", + overwrite=True, + ) + + assert lake.exists(TEST_FILE_PATH) + + lake.rm(TEST_FILE_PATH) diff --git a/tests/unit/test_azure_sql.py b/tests/unit/test_azure_sql.py new file mode 100644 index 000000000..5b419cf97 --- /dev/null +++ b/tests/unit/test_azure_sql.py @@ -0,0 +1,73 @@ +import pytest + +from viadot.sources.azure_sql import AzureSQL +from viadot.sources.sql_server import SQLServerCredentials + + +@pytest.fixture +def azure_sql_credentials(): + return SQLServerCredentials( + user="test_user", + password="test_password", # pragma: allowlist secret # noqa: S106 + server="localhost", + db_name="test_db", + driver="ODBC Driver 17 for SQL Server", + ) + + +@pytest.fixture +def azure_sql(azure_sql_credentials: SQLServerCredentials, mocker): + mocker.patch("viadot.sources.base.SQL.con", return_value=True) + + return AzureSQL( + credentials={ + "user": azure_sql_credentials.user, + "password": azure_sql_credentials.password, + "server": azure_sql_credentials.server, + "db_name": azure_sql_credentials.db_name, + "data_source": "test_data_source", + } + ) + + +def test_azure_sql_initialization(azure_sql): + """Test that the AzureSQL object is initialized with the correct credentials.""" + assert azure_sql.credentials["server"] == "localhost" + assert azure_sql.credentials["user"] == "test_user" + assert ( + azure_sql.credentials["password"] + == "test_password" # pragma: allowlist secret # noqa: S105 + ) + assert azure_sql.credentials["db_name"] == "test_db" + + +def test_create_external_database(azure_sql, mocker): + """Test the create_external_database function.""" + mock_run = mocker.patch("viadot.sources.base.SQL.run", return_value=True) + + # Test parameters + external_database_name = "test_external_db" + storage_account_name = "test_storage_account" + container_name = "test_container" + sas_token = "test_sas_token" # noqa: S105 + master_key_password = ( + "test_master_key_password" # pragma: allowlist secret # noqa: S105 + ) + credential_name = "custom_credential_name" + + azure_sql.create_external_database( + external_database_name=external_database_name, + storage_account_name=storage_account_name, + container_name=container_name, + sas_token=sas_token, + master_key_password=master_key_password, + credential_name=credential_name, + ) + + # Expected SQL commands with custom credential name + expected_master_key_sql = ( + f"CREATE MASTER KEY ENCRYPTION BY PASSWORD = {master_key_password}" + ) + + mock_run.assert_any_call(expected_master_key_sql) + assert mock_run.call_count == 3 # Ensure all 3 SQL commands were executed diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index ec2c74e8a..2b2b716e6 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -8,6 +8,8 @@ from viadot.utils import ( _cast_df_cols, add_viadot_metadata_columns, + df_clean_column, + df_converts_bytes_to_int, gen_bulk_insert_query_from_df, get_fqn, handle_api_request, @@ -282,3 +284,56 @@ def test_validate_and_reorder_different_order_columns(): assert result[0].equals(df1) assert list(result[1].columns) == list(expected_df2.columns) assert result[1].equals(expected_df2) + + +def test_df_converts_bytes_to_int(): + df_bytes = pd.DataFrame( + { + "A": [b"1", b"2", b"3"], + "B": [b"4", b"5", b"6"], + "C": ["no change", "still no change", "me neither"], + } + ) + + result = df_converts_bytes_to_int(df_bytes) + + expected = pd.DataFrame( + { + "A": [1, 2, 3], + "B": [4, 5, 6], + "C": ["no change", "still no change", "me neither"], + } + ) + + pd.testing.assert_frame_equal(result, expected) + + df_no_bytes = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) + + result_no_bytes = df_converts_bytes_to_int(df_no_bytes) + + pd.testing.assert_frame_equal(result_no_bytes, df_no_bytes) + + +def test_df_clean_column(): + df_dirty = pd.DataFrame( + { + "A": ["Hello\tWorld", "Goodbye\nWorld"], + "B": ["Keep\nIt\tClean", "Just\tTest"], + } + ) + + cleaned_df = df_clean_column(df_dirty, columns_to_clean=["A"]) + + expected_cleaned_df = pd.DataFrame( + {"A": ["HelloWorld", "GoodbyeWorld"], "B": ["Keep\nIt\tClean", "Just\tTest"]} + ) + + pd.testing.assert_frame_equal(cleaned_df, expected_cleaned_df) + + cleaned_all_df = df_clean_column(df_dirty) + + expected_all_cleaned_df = pd.DataFrame( + {"A": ["HelloWorld", "GoodbyeWorld"], "B": ["KeepItClean", "JustTest"]} + ) + + pd.testing.assert_frame_equal(cleaned_all_df, expected_all_cleaned_df)