Skip to content

Commit

Permalink
added config job
Browse files Browse the repository at this point in the history
  • Loading branch information
MDobransky committed Sep 3, 2024
1 parent 3f2ce2f commit 68a7ad7
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 47 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ All notable changes to this project will be documented in this file.
- transformation header changed
- added argument to skip dependency checking
- added overrides parameter to allow for dynamic overriding of config values
- removed date_from and date_to from arguments, use overrides instead
#### Jobs
- jobs are now the main way to create all pipelines
- config holder removed from jobs
- metadata_manager and feature_loader are now available arguments, depending on configuration
- added @config decorator, similar use case to @datasource, for parsing configuration
#### TableReader
- function signatures changed
- until -> date_until
Expand Down
23 changes: 17 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

from pydantic import BaseModelfrom rialto.runner.config_loader import PipelineConfigfrom rialto.jobs import config

# Rialto

Expand Down Expand Up @@ -54,8 +54,6 @@ A runner by default executes all the jobs provided in the configuration file, fo
This behavior can be modified by various parameters and switches available.

* **run_date** - date at which the runner is triggered (defaults to day of running)
* **date_from** - starting date (defaults to rundate - config watch period)
* **date_until** - end date (defaults to rundate)
* **rerun** - rerun all jobs even if they already succeeded in the past runs
* **op** - run only selected operation / pipeline
* **skip_dependencies** - ignore dependency checks and run all jobs
Expand Down Expand Up @@ -131,6 +129,9 @@ pipelines: # a list of pipelines to run
interval:
units: "days"
value: 6
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
```
The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema.
Expand Down Expand Up @@ -371,17 +372,27 @@ With that sorted out, we can now provide a quick example of the *rialto.jobs* mo
```python
from pyspark.sql import DataFrame
from rialto.common import TableReader
from rialto.jobs.decorators import job, datasource
from rialto.jobs.decorators import config, job, datasource
from rialto.runner.config_loader import PipelineConfig
from pydantic import BaseModel


class ConfigModel(BaseModel):
some_value: int
some_other_value: str

@config
def my_config(config: PipelineConfig):
return ConfigModel(**config.extras)

@datasource
def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFrame:
return table_reader.get_latest("my_catalog.my_schema.my_table", date_until=run_date)


@job
def my_job(my_datasource: DataFrame) -> DataFrame:
return my_datasource.withColumn("HelloWorld", F.lit(1))
def my_job(my_datasource: DataFrame, my_config: ConfigModel) -> DataFrame:
return my_datasource.withColumn("HelloWorld", F.lit(my_config.some_value))
```
This piece of code
1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner.
Expand Down
2 changes: 1 addition & 1 deletion rialto/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rialto.jobs.decorators import datasource, job
from rialto.jobs.decorators import config, datasource, job
2 changes: 1 addition & 1 deletion rialto/jobs/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .decorators import datasource, job
from .decorators import config, datasource, job
16 changes: 15 additions & 1 deletion rialto/jobs/decorators/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ["datasource", "job"]
__all__ = ["datasource", "job", "config"]

import inspect
import typing
Expand All @@ -24,6 +24,20 @@
from rialto.jobs.decorators.resolver import Resolver


def config(ds_getter: typing.Callable) -> typing.Callable:
"""
Config parser functions decorator.
Registers a config parsing function into a rialto job prerequisite.
You can then request the job via job function arguments.
:param ds_getter: dataset reader function
:return: raw reader function, unchanged
"""
Resolver.register_callable(ds_getter)
return ds_getter


def datasource(ds_getter: typing.Callable) -> typing.Callable:
"""
Dataset reader functions decorator.
Expand Down
7 changes: 5 additions & 2 deletions rialto/jobs/decorators/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import importlib
import typing
from contextlib import contextmanager
from unittest.mock import patch, create_autospec, MagicMock
from rialto.jobs.decorators.resolver import Resolver, ResolverException
from unittest.mock import MagicMock, create_autospec, patch

from rialto.jobs.decorators.job_base import JobBase
from rialto.jobs.decorators.resolver import Resolver, ResolverException


def _passthrough_decorator(*args, **kwargs) -> typing.Callable:
Expand All @@ -34,6 +35,8 @@ def _disable_job_decorators() -> None:
patches = [
patch("rialto.jobs.decorators.datasource", _passthrough_decorator),
patch("rialto.jobs.decorators.decorators.datasource", _passthrough_decorator),
patch("rialto.jobs.decorators.config", _passthrough_decorator),
patch("rialto.jobs.decorators.decorators.config", _passthrough_decorator),
patch("rialto.jobs.decorators.job", _passthrough_decorator),
patch("rialto.jobs.decorators.decorators.job", _passthrough_decorator),
]
Expand Down
3 changes: 1 addition & 2 deletions rialto/runner/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class MetadataManagerConfig(BaseModel):


class FeatureLoaderConfig(BaseModel):
config_path: str
feature_schema: str
metadata_schema: str

Expand All @@ -81,7 +80,7 @@ class PipelineConfig(BaseModel):
module: ModuleConfig
schedule: ScheduleConfig
dependencies: Optional[List[DependencyConfig]] = []
target: Optional[TargetConfig] = None
target: TargetConfig = None
metadata_manager: Optional[MetadataManagerConfig] = None
feature_loader: Optional[FeatureLoaderConfig] = None
extras: Optional[Dict] = {}
Expand Down
27 changes: 9 additions & 18 deletions rialto/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ def __init__(
spark: SparkSession,
config_path: str,
run_date: str = None,
date_from: str = None,
date_until: str = None,
rerun: bool = False,
op: str = None,
skip_dependencies: bool = False,
Expand All @@ -49,9 +47,6 @@ def __init__(
self.spark = spark
self.config = get_pipelines_config(config_path, overrides)
self.reader = TableReader(spark)

self.date_from = date_from
self.date_until = date_until
self.rerun = rerun
self.skip_dependencies = skip_dependencies
self.op = op
Expand All @@ -61,19 +56,15 @@ def __init__(
run_date = DateManager.str_to_date(run_date)
else:
run_date = date.today()
if self.date_from:
self.date_from = DateManager.str_to_date(date_from)
if self.date_until:
self.date_until = DateManager.str_to_date(date_until)

if not self.date_from:
self.date_from = DateManager.date_subtract(
run_date=run_date,
units=self.config.runner.watched_period_units,
value=self.config.runner.watched_period_value,
)
if not self.date_until:
self.date_until = run_date

self.date_from = DateManager.date_subtract(
run_date=run_date,
units=self.config.runner.watched_period_units,
value=self.config.runner.watched_period_value,
)

self.date_until = run_date

if self.date_from > self.date_until:
raise ValueError(f"Invalid date range from {self.date_from} until {self.date_until}")
logger.info(f"Running period from {self.date_from} until {self.date_until}")
Expand Down
7 changes: 7 additions & 0 deletions tests/jobs/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ def test_dataset_decorator():
assert test_dataset == "dataset_return"


def test_config_decorator():
_ = import_module("tests.jobs.test_job.test_job")
test_dataset = Resolver.resolve("custom_config")

assert test_dataset == "config_return"


def _rialto_import_stub(module_name, class_name):
module = import_module(module_name)
class_obj = getattr(module, class_name)
Expand Down
5 changes: 4 additions & 1 deletion tests/jobs/test_job/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rialto.jobs.decorators import config, datasource, job


from rialto.jobs.decorators import datasource, job
@config
def custom_config():
return "config_return"


@datasource
Expand Down
26 changes: 11 additions & 15 deletions tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ def test_init_dates(spark):
runner = Runner(
spark,
config_path="tests/runner/transformations/config.yaml",
date_from="2023-03-01",
date_until="2023-03-31",
run_date="2023-03-31",
overrides={"runner.watched_period_units": "weeks", "runner.watched_period_value": 2},
)
assert runner.date_from == DateManager.str_to_date("2023-03-01")
assert runner.date_from == DateManager.str_to_date("2023-03-17")
assert runner.date_until == DateManager.str_to_date("2023-03-31")

runner = Runner(
Expand Down Expand Up @@ -156,8 +156,7 @@ def test_check_dates_have_partition(spark, mocker):
runner = Runner(
spark,
config_path="tests/runner/transformations/config.yaml",
date_from="2023-03-01",
date_until="2023-03-31",
run_date="2023-03-31",
)
runner.reader = MockReader(spark)
dates = ["2023-03-04", "2023-03-05", "2023-03-06"]
Expand All @@ -173,8 +172,7 @@ def test_check_dates_have_partition_no_table(spark, mocker):
runner = Runner(
spark,
config_path="tests/runner/transformations/config.yaml",
date_from="2023-03-01",
date_until="2023-03-31",
run_date="2023-03-31",
)
dates = ["2023-03-04", "2023-03-05", "2023-03-06"]
dates = [DateManager.str_to_date(d) for d in dates]
Expand All @@ -193,8 +191,7 @@ def test_check_dependencies(spark, mocker, r_date, expected):
runner = Runner(
spark,
config_path="tests/runner/transformations/config.yaml",
date_from="2023-03-01",
date_until="2023-03-31",
run_date="2023-03-31",
)
runner.reader = MockReader(spark)
res = runner.check_dependencies(runner.config.pipelines[0], DateManager.str_to_date(r_date))
Expand All @@ -207,8 +204,7 @@ def test_check_no_dependencies(spark, mocker):
runner = Runner(
spark,
config_path="tests/runner/transformations/config.yaml",
date_from="2023-03-01",
date_until="2023-03-31",
run_date="2023-03-31",
)
runner.reader = MockReader(spark)
res = runner.check_dependencies(runner.config.pipelines[1], DateManager.str_to_date("2023-03-05"))
Expand All @@ -221,8 +217,8 @@ def test_select_dates(spark, mocker):
runner = Runner(
spark,
config_path="tests/runner/transformations/config.yaml",
date_from="2023-03-01",
date_until="2023-03-31",
run_date="2023-03-31",
overrides={"runner.watched_period_units": "months", "runner.watched_period_value": 1},
)
runner.reader = MockReader(spark)

Expand All @@ -243,8 +239,8 @@ def test_select_dates_all_done(spark, mocker):
runner = Runner(
spark,
config_path="tests/runner/transformations/config.yaml",
date_from="2023-03-02",
date_until="2023-03-02",
run_date="2023-03-02",
overrides={"runner.watched_period_units": "months", "runner.watched_period_value": 0},
)
runner.reader = MockReader(spark)

Expand Down

0 comments on commit 68a7ad7

Please sign in to comment.