Skip to content

Commit

Permalink
Jobs Changes Docs
Browse files Browse the repository at this point in the history
  • Loading branch information
vvancak committed Jul 25, 2024
1 parent 2924719 commit 47b4c7a
Showing 1 changed file with 74 additions and 10 deletions.
84 changes: 74 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,34 +321,98 @@ def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFra
def my_job(my_datasource: DataFrame) -> DataFrame:
return my_datasource.withColumn("HelloWorld", F.lit(1))
```
This piece of code creates a rialto transformation called *my_job*, which is then callable by the rialto runner. It first sources the *my_datasource* and then runs *my_job* on top of that datasource.

### job naming / outputs
The rialto runner creates a final table according to the job's name. Therefore, we do support 2 ways of creating jobs:
This piece of code
1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner.
2. It sources the *my_datasource* and then runs *my_job* on top of that datasource.
3. Rialto adds VERSION (of your package) and INFORMATION_DATE (as per config) columns automatically.
4. The rialto runner stores the final to a catalog, to a table according to the job's name.

### Custom job names
Note, that by default, the rialto job name is your function name. To allow more flexibility, we therefore allow renaming of the job:
```python
@job("my_custom_name")
@job(custom_name="my_custom_name")
def f(...):
...
```
Just note that any *WeirdCaseNames* will be transformed to *lower_case_with_underscores*.

@job
def my_custom_name(...):
### Disabling Versioning
If you want to disable versioning of your job (adding package VERSION column to your output):

```python3
@job(disable_version=True)
def my_job(...):
...
```
Up to you, both work. Just note that any *WeirdCaseNames* will be transformed to *lower_case_with_underscores*.

### notes / rules
These parameters can be used separately, or combined.

### Notes & Rules
The rules for the dependencies are fairly straightforward.
Both **jobs** and **datasources** can only depend on *pre-defined* dependencies and other *datasources*. Meaning:
* *datasource -> datasource -> job* is perfectly fine,
* *datasource -> job -> datasource* will result in an error.

Secondly, the jobs can, but **don't necessarily have to output** a dataframe.
In case your job doesn't output a dataframe, your job will only return a bunch of rows, which will ensure that rialto notices that the job ran successfully.
In case your job doesn't output a dataframe, your job will return an artificially-created, one-row dataframe, which will ensure that rialto notices that the job ran successfully.
This can be useful in **model training**.

Finally, remember, that your jobs are still just *Rialto Transformations* internally.
Meaning that at the end of the day, you should always read some data, do some operations on it and either return a pyspark DataFrame, or not return anything and let the framework return the placeholder one.

### Testing
One of the main advantages of the jobs module is simplification of unit tests for your transformations. Rialto provides following tools:

#### 1. Disabling Decorators

Assuming we have a my_job.test_job_module.py module:
```python3
@datasource
def datasource_a(...)
... code ...

@job
def my_job(datasource_a, ...)
... code ...
```
The *disable_job_decorators* context manager, as the name suggests, disables all decorator functionality and lets you access your functions as raw functions - making it super simple to unit-test:
```python3
from rialto.jobs.decorators.test_utils import disable_job_decorators
import my_job.test_job_module as tjm

# Datasource Testing
def test_datasource_a():
... mocks here ...

with disable_job_decorators(tjm):
datasource_a_output = tjm.datasource_a(... mocks ...)

... asserts ...

# Job Testing
def test_my_job():
datasource_a_mock = ...
... other mocks...

with disable_job_decorators(tjm):
job_output = tjm.my_job(datasource_a_mock, ... mocks ...)

... asserts ...
```

#### 2. Testing the @job Dependency Tree
In complex use cases, it may happen that the dependencies of a job become quite complex. Or you simply want to be sure that you didn't accidentally misspelled your dependency name:

```python3
from rialto.jobs.decorators.test_utils import resolver_resolves
import my_job.test_job_module as tjm

def test_my_job_resolves(spark):
assert resolver_resolves(spark, tjm.my_job)
```

The code above fails if *my_job* depends on an undefined datasource (even indirectly), and detects cases where there's a circular dependency.

## <a id="loader"></a> 2.4 - loader
This module is used to load features from feature store into your models and scripts. Loader provides options to load singular features, whole feature groups, as well as a selection of features from multiple groups defined in a config file, and served as a singular dataframe. It also provides interface to access feature metadata.

Expand Down

0 comments on commit 47b4c7a

Please sign in to comment.