diff --git a/README.md b/README.md index 142d23e..1f61b9e 100644 --- a/README.md +++ b/README.md @@ -321,34 +321,98 @@ def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFra def my_job(my_datasource: DataFrame) -> DataFrame: return my_datasource.withColumn("HelloWorld", F.lit(1)) ``` -This piece of code creates a rialto transformation called *my_job*, which is then callable by the rialto runner. It first sources the *my_datasource* and then runs *my_job* on top of that datasource. - -### job naming / outputs -The rialto runner creates a final table according to the job's name. Therefore, we do support 2 ways of creating jobs: +This piece of code +1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner. +2. It sources the *my_datasource* and then runs *my_job* on top of that datasource. +3. Rialto adds VERSION (of your package) and INFORMATION_DATE (as per config) columns automatically. +4. The rialto runner stores the final to a catalog, to a table according to the job's name. + +### Custom job names +Note, that by default, the rialto job name is your function name. To allow more flexibility, we therefore allow renaming of the job: ```python -@job("my_custom_name") +@job(custom_name="my_custom_name") def f(...): ... +``` +Just note that any *WeirdCaseNames* will be transformed to *lower_case_with_underscores*. -@job -def my_custom_name(...): +### Disabling Versioning +If you want to disable versioning of your job (adding package VERSION column to your output): + +```python3 +@job(disable_version=True) +def my_job(...): ... ``` -Up to you, both work. Just note that any *WeirdCaseNames* will be transformed to *lower_case_with_underscores*. -### notes / rules +These parameters can be used separately, or combined. + +### Notes & Rules The rules for the dependencies are fairly straightforward. Both **jobs** and **datasources** can only depend on *pre-defined* dependencies and other *datasources*. Meaning: * *datasource -> datasource -> job* is perfectly fine, * *datasource -> job -> datasource* will result in an error. Secondly, the jobs can, but **don't necessarily have to output** a dataframe. -In case your job doesn't output a dataframe, your job will only return a bunch of rows, which will ensure that rialto notices that the job ran successfully. +In case your job doesn't output a dataframe, your job will return an artificially-created, one-row dataframe, which will ensure that rialto notices that the job ran successfully. This can be useful in **model training**. Finally, remember, that your jobs are still just *Rialto Transformations* internally. Meaning that at the end of the day, you should always read some data, do some operations on it and either return a pyspark DataFrame, or not return anything and let the framework return the placeholder one. +### Testing +One of the main advantages of the jobs module is simplification of unit tests for your transformations. Rialto provides following tools: + +#### 1. Disabling Decorators + +Assuming we have a my_job.test_job_module.py module: +```python3 +@datasource +def datasource_a(...) + ... code ... + +@job +def my_job(datasource_a, ...) + ... code ... +``` +The *disable_job_decorators* context manager, as the name suggests, disables all decorator functionality and lets you access your functions as raw functions - making it super simple to unit-test: +```python3 +from rialto.jobs.decorators.test_utils import disable_job_decorators +import my_job.test_job_module as tjm + +# Datasource Testing +def test_datasource_a(): + ... mocks here ... + + with disable_job_decorators(tjm): + datasource_a_output = tjm.datasource_a(... mocks ...) + + ... asserts ... + +# Job Testing +def test_my_job(): + datasource_a_mock = ... + ... other mocks... + + with disable_job_decorators(tjm): + job_output = tjm.my_job(datasource_a_mock, ... mocks ...) + + ... asserts ... +``` + +#### 2. Testing the @job Dependency Tree +In complex use cases, it may happen that the dependencies of a job become quite complex. Or you simply want to be sure that you didn't accidentally misspelled your dependency name: + +```python3 +from rialto.jobs.decorators.test_utils import resolver_resolves +import my_job.test_job_module as tjm + +def test_my_job_resolves(spark): + assert resolver_resolves(spark, tjm.my_job) +``` + +The code above fails if *my_job* depends on an undefined datasource (even indirectly), and detects cases where there's a circular dependency. + ## 2.4 - loader This module is used to load features from feature store into your models and scripts. Loader provides options to load singular features, whole feature groups, as well as a selection of features from multiple groups defined in a config file, and served as a singular dataframe. It also provides interface to access feature metadata.