Skip to content

Commit

Permalink
Merge pull request #16 from AbsaOSS/Release/2.0.0
Browse files Browse the repository at this point in the history
Release/2.0.0
  • Loading branch information
MDobransky authored Sep 21, 2024
2 parents a48165c + cf2a12c commit 79d51ad
Show file tree
Hide file tree
Showing 55 changed files with 2,241 additions and 1,609 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ extend-ignore =
D100,
D104,
D107,
E203,
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,37 @@
# Change Log
All notable changes to this project will be documented in this file.

## 2.0.0 - 2024-mm-dd
#### Runner
- runner config now accepts environment variables
- restructured runner config
- added metadata and feature loader sections
- target moved to pipeline
- dependency date_col is now mandatory
- custom extras config is available in each pipeline and will be passed as dictionary available under pipeline_config.extras
- general section is renamed to runner
- info_date_shift is always a list
- transformation header changed
- added argument to skip dependency checking
- added overrides parameter to allow for dynamic overriding of config values
- removed date_from and date_to from arguments, use overrides instead
#### Jobs
- jobs are now the main way to create all pipelines
- config holder removed from jobs
- metadata_manager and feature_loader are now available arguments, depending on configuration
- added @config decorator, similar use case to @datasource, for parsing configuration
- reworked Resolver + Added ModuleRegister
- datasources no longer just by importing, thus are no longer available for all jobs
- register_dependency_callable and register_dependency_module added to register datasources
- together, it's now possilbe to have 2 datasources with the same name, but different implementations for 2 jobs.
#### TableReader
- function signatures changed
- until -> date_until
- info_date_from -> date_from, info_date_to -> date_to
- date_column is now mandatory
- removed TableReaders ability to infer schema from partitions or properties
#### Loader
- removed DataLoader class, now only PysparkFeatureLoader is needed with additional parameters

## 1.3.0 - 2024-06-07

Expand Down
279 changes: 209 additions & 70 deletions README.md

Large diffs are not rendered by default.

707 changes: 378 additions & 329 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "rialto"

version = "1.3.1"
version = "2.0.0"

packages = [
{ include = "rialto" },
Expand Down Expand Up @@ -30,6 +30,8 @@ pytest-mock = "^3.11.1"
pandas = "^2.1.0"
flake8-broken-line = "^1.0.0"
loguru = "^0.7.2"
importlib-metadata = "^7.2.1"
numpy = "<2.0.0"

[tool.poetry.dev-dependencies]
pyspark = "^3.4.1"
Expand Down
2 changes: 1 addition & 1 deletion rialto/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rialto.common.table_reader import TableReader
from rialto.common.table_reader import DataReader, TableReader
35 changes: 35 additions & 0 deletions rialto/common/env_yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import re

import yaml
from loguru import logger

__all__ = ["EnvLoader"]

# Regex pattern to capture variable and the rest of the string
_path_matcher = re.compile(r"(?P<before>.*)\$\{(?P<env_name>[^}^{:]+)(?::(?P<default_value>[^}^{]*))?\}(?P<after>.*)")


def _path_constructor(loader, node):
value = node.value
match = _path_matcher.search(value)
if match:
before = match.group("before")
after = match.group("after")
sub = os.getenv(match.group("env_name"), match.group("default_value"))
if sub is None:
raise ValueError(f"Environment variable {match.group('env_name')} has no assigned value")
new_value = before + sub + after
logger.info(f"Config: Replacing {value}, with {new_value}")
return new_value
return value


class EnvLoader(yaml.SafeLoader):
"""Custom loader that replaces values with environment variables"""

pass


EnvLoader.add_implicit_resolver("!env_substitute", _path_matcher, None)
EnvLoader.add_constructor("!env_substitute", _path_constructor)
70 changes: 23 additions & 47 deletions rialto/common/table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession

from rialto.common.utils import get_date_col_property, get_delta_partition


class DataReader(metaclass=abc.ABCMeta):
"""
Expand All @@ -36,16 +34,15 @@ class DataReader(metaclass=abc.ABCMeta):
def get_latest(
self,
table: str,
until: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date
:param table: input table path
:param until: Optional until date (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param date_until: Optional until date (inclusive)
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
Expand All @@ -55,18 +52,17 @@ def get_latest(
def get_table(
self,
table: str,
info_date_from: Optional[datetime.date] = None,
info_date_to: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_from: Optional[datetime.date] = None,
date_to: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get a whole table or a slice by selected dates
:param table: input table path
:param info_date_from: Optional date from (inclusive)
:param info_date_to: Optional date to (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param date_from: Optional date from (inclusive)
:param date_to: Optional date to (inclusive)
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
Expand All @@ -76,17 +72,13 @@ def get_table(
class TableReader(DataReader):
"""An implementation of data reader for databricks tables"""

def __init__(self, spark: SparkSession, date_property: str = "rialto_date_column", infer_partition: bool = False):
def __init__(self, spark: SparkSession):
"""
Init
:param spark:
:param date_property: Databricks table property specifying date column, take priority over inference
:param infer_partition: infer date column as tables partition from delta metadata
"""
self.spark = spark
self.date_property = date_property
self.infer_partition = infer_partition
super().__init__()

def _uppercase_column_names(self, df: DataFrame) -> DataFrame:
Expand All @@ -106,41 +98,26 @@ def _get_latest_available_date(self, df: DataFrame, date_col: str, until: Option
df = df.select(F.max(date_col)).alias("latest")
return df.head()[0]

def _get_date_col(self, table: str, date_column: str):
"""
Get tables date column
column specified at get_table/get_latest takes priority, if inference is enabled it
takes 2nd place, last resort is table property
"""
if date_column:
return date_column
elif self.infer_partition:
return get_delta_partition(self.spark, table)
else:
return get_date_col_property(self.spark, table, self.date_property)

def get_latest(
self,
table: str,
until: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date
:param table: input table path
:param until: Optional until date (inclusive)
:param date_until: Optional until date (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
date_col = self._get_date_col(table, date_column)
df = self.spark.read.table(table)

selected_date = self._get_latest_available_date(df, date_col, until)
df = df.filter(F.col(date_col) == selected_date)
selected_date = self._get_latest_available_date(df, date_column, date_until)
df = df.filter(F.col(date_column) == selected_date)

if uppercase_columns:
df = self._uppercase_column_names(df)
Expand All @@ -149,28 +126,27 @@ def get_latest(
def get_table(
self,
table: str,
info_date_from: Optional[datetime.date] = None,
info_date_to: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_from: Optional[datetime.date] = None,
date_to: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get a whole table or a slice by selected dates
:param table: input table path
:param info_date_from: Optional date from (inclusive)
:param info_date_to: Optional date to (inclusive)
:param date_from: Optional date from (inclusive)
:param date_to: Optional date to (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
date_col = self._get_date_col(table, date_column)
df = self.spark.read.table(table)

if info_date_from:
df = df.filter(F.col(date_col) >= info_date_from)
if info_date_to:
df = df.filter(F.col(date_col) <= info_date_to)
if date_from:
df = df.filter(F.col(date_column) >= date_from)
if date_to:
df = df.filter(F.col(date_column) <= date_to)
if uppercase_columns:
df = self._uppercase_column_names(df)
return df
60 changes: 25 additions & 35 deletions rialto/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ["load_yaml", "get_date_col_property", "get_delta_partition"]
__all__ = ["load_yaml", "cast_decimals_to_floats", "get_caller_module"]

import inspect
import os
from typing import Any
from typing import Any, List

import pyspark.sql.functions as F
import yaml
from pyspark.sql import DataFrame
from pyspark.sql.types import FloatType

from rialto.common.env_yaml import EnvLoader


def load_yaml(path: str) -> Any:
"""
Expand All @@ -34,50 +37,37 @@ def load_yaml(path: str) -> Any:
raise FileNotFoundError(f"Can't find {path}.")

with open(path, "r") as stream:
return yaml.safe_load(stream)
return yaml.load(stream, EnvLoader)


def get_date_col_property(spark, table: str, property: str) -> str:
def cast_decimals_to_floats(df: DataFrame) -> DataFrame:
"""
Retrieve a data column name from a given table property
Find all decimal types in the table and cast them to floats. Fixes errors in .toPandas() conversions.
:param spark: spark session
:param table: path to table
:param property: name of the property
:return: data column name
:param df: input df
:return: pyspark DataFrame with fixed types
"""
props = spark.sql(f"show tblproperties {table}")
date_col = props.filter(F.col("key") == property).select("value").collect()
if len(date_col):
return date_col[0].value
else:
raise RuntimeError(f"Table {table} has no property {property}.")
decimal_cols = [col_name for col_name, data_type in df.dtypes if "decimal" in data_type]
for c in decimal_cols:
df = df.withColumn(c, F.col(c).cast(FloatType()))

return df

def get_delta_partition(spark, table: str) -> str:
"""
Select first partition column of the delta table

:param table: full table name
:return: partition column name
def get_caller_module() -> Any:
"""
columns = spark.catalog.listColumns(table)
partition_columns = list(filter(lambda c: c.isPartition, columns))
if len(partition_columns):
return partition_columns[0].name
else:
raise RuntimeError(f"Delta table has no partitions: {table}.")
Ged module containing the function which is calling your function.
Inspects the call stack, where:
0th entry is this function
1st entry is the function which needs to know who called it
2nd entry is the calling function
def cast_decimals_to_floats(df: DataFrame) -> DataFrame:
"""
Find all decimal types in the table and cast them to floats. Fixes errors in .toPandas() conversions.
Therefore, we'll return a module which contains the function at the 2nd place on the stack.
:param df: pyspark DataFrame
:return: pyspark DataFrame with fixed types
:return: Python Module containing the calling function.
"""
decimal_cols = [col_name for col_name, data_type in df.dtypes if "decimal" in data_type]
for c in decimal_cols:
df = df.withColumn(c, F.col(c).cast(FloatType()))

return df
stack = inspect.stack()
last_stack = stack[2]
return inspect.getmodule(last_stack[0])
6 changes: 6 additions & 0 deletions rialto/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from rialto.jobs.decorators import config_parser, datasource, job
from rialto.jobs.module_register import (
register_dependency_callable,
register_dependency_module,
)
Loading

0 comments on commit 79d51ad

Please sign in to comment.