Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add AzureSQL source and Prefect tasks #1043

Merged
merged 51 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
78f02d5
🚀 Adding `Aselite` connector with integration tests
Sep 18, 2024
7efdf03
🚀 Add unit tests for Aselite (Azure SQL)
Sep 19, 2024
5548206
🐛 Fix task utils bug
Sep 20, 2024
b7111a9
🎨 Refactor task utils file to pass code_checker
Sep 20, 2024
497a770
removed prefect dependency from azure_sql source
fdelgadodyvenia Sep 21, 2024
f787e97
Bug located into the return task
fdelgadodyvenia Sep 22, 2024
95705e4
♻️ Change flow and task name to `azure_sql`
Sep 26, 2024
2d99061
Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
adrian-wojcik Sep 26, 2024
15995ab
Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
adrian-wojcik Sep 26, 2024
04cf3ed
Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
adrian-wojcik Sep 26, 2024
30433aa
Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
adrian-wojcik Sep 26, 2024
228036f
Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
adrian-wojcik Sep 26, 2024
3a58b5e
Update src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
adrian-wojcik Sep 26, 2024
7a23b63
Update src/viadot/orchestration/prefect/tasks/azure_sql.py
adrian-wojcik Sep 26, 2024
9bac63d
Update src/viadot/orchestration/prefect/tasks/azure_sql.py
adrian-wojcik Sep 26, 2024
a32a56f
Update src/viadot/orchestration/prefect/tasks/azure_sql.py
adrian-wojcik Sep 26, 2024
c0888c0
Update src/viadot/orchestration/prefect/tasks/azure_sql.py
adrian-wojcik Sep 26, 2024
328cffb
Update src/viadot/orchestration/prefect/tasks/azure_sql.py
adrian-wojcik Sep 26, 2024
f817598
Update src/viadot/orchestration/prefect/tasks/azure_sql.py
adrian-wojcik Sep 26, 2024
a067fe4
Update tests/integration/orchestration/prefect/flows/test_azure_sql_t…
adrian-wojcik Sep 26, 2024
a3aafd8
Update tests/integration/orchestration/prefect/flows/test_azure_sql_t…
adrian-wojcik Sep 26, 2024
185620d
Update tests/integration/orchestration/prefect/flows/test_azure_sql_t…
adrian-wojcik Sep 26, 2024
3814f50
Update tests/integration/orchestration/prefect/flows/test_azure_sql_t…
adrian-wojcik Sep 26, 2024
d410424
Update tests/integration/orchestration/prefect/flows/test_azure_sql_t…
adrian-wojcik Sep 26, 2024
be0bafa
Update tests/integration/orchestration/prefect/flows/test_azure_sql_t…
adrian-wojcik Sep 26, 2024
0d6e81f
Update tests/integration/orchestration/prefect/flows/test_azure_sql_t…
adrian-wojcik Sep 26, 2024
5e25dd8
Update tests/integration/orchestration/prefect/tasks/test_azure_sql.py
adrian-wojcik Sep 26, 2024
18740fc
Update tests/integration/orchestration/prefect/tasks/test_azure_sql.py
adrian-wojcik Sep 26, 2024
b350036
🐛 Fix task tests bugs
Sep 26, 2024
b7886f7
🐛 Fix bugs in azure sql unit tests
Sep 26, 2024
d8ec4f3
🎨 Changed docstring description for parameter `convert_bytes`
Sep 27, 2024
1a66c21
🔥 Remove task tests as all of it is covered in unit tests
Sep 27, 2024
5a9a264
🎨 Improved structure of the `AzureSQL` source class and added docstring
Rafalz13 Sep 30, 2024
e2bb1a9
✅ Modified tests structure
Rafalz13 Sep 30, 2024
44d9401
🎨 Removed unused parameters and improved structure of the code
Rafalz13 Sep 30, 2024
6b2dd2b
🎨 Removed unused parameters and improved structure of the flow code
Rafalz13 Sep 30, 2024
0e0ecac
🎨 Improved structure of the tests code
Rafalz13 Sep 30, 2024
7b1687b
🎨 Improved structure of the `__init__` files
Rafalz13 Sep 30, 2024
4fc4866
🎨 Added extra spaces in `chunk_df`
Rafalz13 Sep 30, 2024
bd29ab3
🚧 Added `pragma: allowlist secret`
Rafalz13 Sep 30, 2024
edb7b62
🚧 Added `# noqa: S105`
Rafalz13 Sep 30, 2024
ac82a79
🚧 Added `pragma: allowlist secret`
Rafalz13 Sep 30, 2024
f03ef40
🚨 Fix linter and pre-commit errors
trymzet Sep 30, 2024
ceefe7c
🐛 Removed `src`
Rafalz13 Sep 30, 2024
95a5d29
✅ Updated tests
Rafalz13 Oct 1, 2024
9eaf43b
🐛 Fixed names
Rafalz13 Oct 1, 2024
73e303e
🐛 Added fixtures
Rafalz13 Oct 1, 2024
e44f5ef
Update tests/unit/test_azure_sql.py
Rafalz13 Oct 2, 2024
735311d
🎨 Moved operations from task to source and created new `to_df()` method
Rafalz13 Oct 2, 2024
c74367f
🎨 Changed to `map` and added `super()` in to_df method
Rafalz13 Oct 4, 2024
ce82748
📝 Add connector documentation
trymzet Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@
::: viadot.orchestration.prefect.flows.sql_server_to_minio

::: viadot.orchestration.prefect.flows.sql_server_to_parquet

::: viadot.orchestration.prefect.flows.azure_sql_to_adls
2 changes: 2 additions & 0 deletions docs/references/orchestration/prefect/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@
::: viadot.orchestration.prefect.tasks.sql_server_query

::: viadot.orchestration.prefect.tasks.sql_server_to_df

::: viadot.orchestration.prefect.tasks.azure_sql_to_df
2 changes: 2 additions & 0 deletions docs/references/sources/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
::: viadot.sources.sap_rfc.SAPRFC

::: viadot.sources.sap_rfc.SAPRFCV2

::: viadot.sources.azure_sql.AzureSQL
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Import flows."""

from .azure_sql_to_adls import azure_sql_to_adls
from .bigquery_to_adls import bigquery_to_adls
from .cloud_for_customers_to_adls import cloud_for_customers_to_adls
from .cloud_for_customers_to_databricks import cloud_for_customers_to_databricks
Expand Down Expand Up @@ -34,6 +35,7 @@


__all__ = [
"azure_sql_to_adls",
"bigquery_to_adls",
"cloud_for_customers_to_adls",
"cloud_for_customers_to_databricks",
Expand Down
77 changes: 77 additions & 0 deletions src/viadot/orchestration/prefect/flows/azure_sql_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Flows for downloading data from Azure SQL and uploading it to Azure ADLS."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import azure_sql_to_df, df_to_adls


@flow(
name="Azure SQL extraction to ADLS",
description="Extract data from Azure SQL"
+ " and load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
log_prints=True,
)
def azure_sql_to_adls(
query: str | None = None,
credentials_secret: str | None = None,
validate_df_dict: dict[str, Any] | None = None,
convert_bytes: bool = False,
remove_special_characters: bool | None = None,
columns_to_clean: list[str] | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
r"""Download data from Azure SQL to a CSV file and uploading it to ADLS.

Args:
query (str): Query to perform on a database. Defaults to None.
credentials_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with database credentials.
Defaults to None.
validate_df_dict (Dict[str], optional): A dictionary with optional list of
tests to verify the output dataframe. If defined, triggers the `validate_df`
task from task_utils. Defaults to None.
convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int
It is used to convert bytes data type into int, as pulling data with bytes
can lead to malformed data in data frame.
Defaults to False.
remove_special_characters (str, optional): Call a function that remove
special characters like escape symbols. Defaults to None.
columns_to_clean (List(str), optional): Select columns to clean, used with
remove_special_characters. If None whole data frame will be processed.
Defaults to None.
adls_config_key (Optional[str], optional): The key in the viadot config holding
relevant credentials. Defaults to None.
adls_azure_key_vault_secret (Optional[str], optional): The name of the Azure Key
Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (Optional[str], optional): Azure Data Lake destination file path (with
file name). Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
"""
data_frame = azure_sql_to_df(
query=query,
credentials_secret=credentials_secret,
validate_df_dict=validate_df_dict,
convert_bytes=convert_bytes,
remove_special_characters=remove_special_characters,
columns_to_clean=columns_to_clean,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Imports."""

from .adls import adls_upload, df_to_adls
from .azure_sql import azure_sql_to_df
from .bcp import bcp
from .bigquery import bigquery_to_df
from .cloud_for_customers import cloud_for_customers_to_df
Expand Down Expand Up @@ -31,6 +32,7 @@


__all__ = [
"azure_sql_to_df",
"adls_upload",
"bcp",
"clone_repo",
Expand Down
70 changes: 70 additions & 0 deletions src/viadot/orchestration/prefect/tasks/azure_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Task for downloading data from Azure SQL."""

from typing import Any, Literal

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import AzureSQL
from viadot.utils import validate


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60)
def azure_sql_to_df(
query: str | None = None,
credentials_secret: str | None = None,
validate_df_dict: dict[str, Any] | None = None,
convert_bytes: bool = False,
remove_special_characters: bool | None = None,
columns_to_clean: list[str] | None = None,
if_empty: Literal["warn", "skip", "fail"] = "warn",
) -> pd.DataFrame:
r"""Task to download data from Azure SQL.

Args:
query (str): Query to perform on a database. Defaults to None.
credentials_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with database credentials.
Defaults to None.
validate_df_dict (Dict[str], optional): A dictionary with optional list of
tests to verify the output dataframe. If defined, triggers the `validate_df`
task from task_utils. Defaults to None.
convert_bytes (bool). A boolean value to trigger method df_converts_bytes_to_int
It is used to convert bytes data type into int, as pulling data with bytes
can lead to malformed data in data frame.
Defaults to False.
remove_special_characters (str, optional): Call a function that remove
special characters like escape symbols. Defaults to None.
columns_to_clean (List(str), optional): Select columns to clean, used with
remove_special_characters. If None whole data frame will be processed.
Defaults to None.
if_empty (Literal["warn", "skip", "fail"], optional): What to do if the
query returns no data. Defaults to None.

Raises:
ValueError: Raising ValueError if credentials_secret is not provided

Returns:
pd.DataFrame: The response data as a pandas DataFrame.
"""
if not credentials_secret:
msg = "`credentials_secret` has to be specified and not empty."
raise ValueError(msg)

credentials = get_credentials(credentials_secret)

azure_sql = AzureSQL(credentials=credentials)

df = azure_sql.to_df(
query=query,
if_empty=if_empty,
convert_bytes=convert_bytes,
remove_special_characters=remove_special_characters,
columns_to_clean=columns_to_clean,
)

if validate_df_dict is not None:
validate(df=df, tests=validate_df_dict)

return df
41 changes: 4 additions & 37 deletions src/viadot/orchestration/prefect/tasks/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def dtypes_to_json_task(dtypes_dict: dict[str, Any], local_json_path: str) -> No
dtypes_dict (dict): Dictionary containing data types.
local_json_path (str): Path to local json file.
"""
with Path(local_json_path).open("w") as fp:
json.dump(dtypes_dict, fp)
with Path(local_json_path).open("w") as file_path:
json.dump(dtypes_dict, file_path)


@task
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict:
"Categorical": "VARCHAR(500)",
"Time": "TIME",
"Boolean": "VARCHAR(5)", # Bool is True/False, Microsoft expects 0/1
"DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is the only timezone-aware dtype in TSQL
"DateTime": "DATETIMEOFFSET", # DATETIMEOFFSET is timezone-aware dtype in TSQL
"Object": "VARCHAR(500)",
"EmailAddress": "VARCHAR(50)",
"File": None,
Expand All @@ -73,7 +73,7 @@ def get_sql_dtypes_from_df(df: pd.DataFrame) -> dict:
"String": "VARCHAR(500)",
"IPAddress": "VARCHAR(39)",
"Path": "VARCHAR(255)",
"TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg. '1 days 11:00:00'
"TimeDelta": "VARCHAR(20)", # datetime.datetime.timedelta; eg.'1 days 11:00:00'
"URL": "VARCHAR(255)",
"Count": "INT",
}
Expand Down Expand Up @@ -209,36 +209,3 @@ def union_dfs_task(dfs: list[pd.DataFrame]) -> pd.DataFrame:
different size of DataFrames NaN values can appear.
"""
return pd.concat(dfs, ignore_index=True)


@task
def df_clean_column(
df: pd.DataFrame, columns_to_clean: list[str] | None = None
) -> pd.DataFrame:
"""Remove special characters from a pandas DataFrame.

Args:
df (pd.DataFrame): The DataFrame to clean.
columns_to_clean (List[str]): A list of columns to clean. Defaults is None.

Returns:
pd.DataFrame: The cleaned DataFrame
"""
df = df.copy()

if columns_to_clean is None:
df.replace(
to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"],
value=["", ""],
regex=True,
inplace=True,
)
else:
for col in columns_to_clean:
df[col].replace(
to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"],
value=["", ""],
regex=True,
inplace=True,
)
return df
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ._duckdb import DuckDB
from ._trino import Trino
from .azure_sql import AzureSQL
from .bigquery import BigQuery
from .cloud_for_customers import CloudForCustomers
from .customer_gauge import CustomerGauge
Expand All @@ -24,6 +25,7 @@


__all__ = [
"AzureSQL",
"BigQuery",
"CloudForCustomers",
"CustomerGauge",
Expand Down
Loading