-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RLG-3595 rialto v2 #12
Changes from all commits
a49b06c
8e18da2
1699043
a185b20
20b9cb2
a0d5a02
5401307
eea51cd
6427626
714ad58
3f2ce2f
68a7ad7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
|
||
from pydantic import BaseModelfrom rialto.runner.config_loader import PipelineConfigfrom rialto.jobs import config | ||
|
||
# Rialto | ||
|
||
|
@@ -53,31 +53,21 @@ runner() | |
A runner by default executes all the jobs provided in the configuration file, for all the viable execution dates according to the configuration file for which the job has not yet run successfully (i.e. the date partition doesn't exist on the storage) | ||
This behavior can be modified by various parameters and switches available. | ||
|
||
* **feature_metadata_schema** - path to schema where feature metadata are read and stored, needed for [maker](#maker) jobs and jobs that utilized feature [loader](#loader) | ||
* **run_date** - date at which the runner is triggered (defaults to day of running) | ||
* **date_from** - starting date (defaults to rundate - config watch period) | ||
* **date_until** - end date (defaults to rundate) | ||
* **feature_store_schema** - location of features, needed for jobs utilizing feature [loader](#loader) | ||
* **custom_job_config** - dictionary with key-value pairs that will be accessible under the "config" variable in your rialto jobs | ||
* **rerun** - rerun all jobs even if they already succeeded in the past runs | ||
* **op** - run only selected operation / pipeline | ||
|
||
* **skip_dependencies** - ignore dependency checks and run all jobs | ||
* **overrides** - dictionary of overrides for the configuration | ||
|
||
|
||
Transformations are not included in the runner itself, it imports them dynamically according to the configuration, therefore it's necessary to have them locally installed. | ||
|
||
A runner created table has will have automatically created **rialto_date_column** table property set according to target partition set in the configuration. | ||
|
||
### Configuration | ||
|
||
```yaml | ||
general: | ||
target_schema: catalog.schema # schema where tables will be created, must exist | ||
target_partition_column: INFORMATION_DATE # date to partition new tables on | ||
source_date_column_property: rialto_date_column # name of the date property on source tables | ||
runner: | ||
watched_period_units: "months" # unit of default run period | ||
watched_period_value: 2 # value of default run period | ||
job: "run" # run for running the pipelines, check for only checking dependencies | ||
mail: | ||
to: # a list of email addresses | ||
- [email protected] | ||
|
@@ -100,7 +90,7 @@ pipelines: # a list of pipelines to run | |
dependencies: # list of dependent tables | ||
- table: catalog.schema.table1 | ||
name: "table1" # Optional table name, used to recall dependency details in transformation | ||
date_col: generation_date # Optional date column name, takes priority | ||
date_col: generation_date # Mandatory date column name | ||
interval: # mandatory availability interval, subtracted from scheduled day | ||
units: "days" | ||
value: 1 | ||
|
@@ -109,6 +99,18 @@ pipelines: # a list of pipelines to run | |
interval: | ||
units: "months" | ||
value: 1 | ||
target: | ||
target_schema: catalog.schema # schema where tables will be created, must exist | ||
target_partition_column: INFORMATION_DATE # date to partition new tables on | ||
metadata_manager: # optional | ||
metadata_schema: catalog.metadata # schema where metadata is stored | ||
feature_loader: # optional | ||
config_path: model_features_config.yaml # path to the feature loader configuration file | ||
feature_schema: catalog.feature_tables # schema where feature tables are stored | ||
metadata_schema: catalog.metadata # schema where metadata is stored | ||
extras: #optional arguments processed as dictionary | ||
some_value: 3 | ||
some_other_value: giraffe | ||
|
||
- name: PipelineTable1 # will be written as pipeline_table1 | ||
module: | ||
|
@@ -127,8 +129,67 @@ pipelines: # a list of pipelines to run | |
interval: | ||
units: "days" | ||
value: 6 | ||
target: | ||
target_schema: catalog.schema # schema where tables will be created, must exist | ||
target_partition_column: INFORMATION_DATE # date to partition new tables on | ||
``` | ||
|
||
The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema. | ||
Here are few examples of overrides: | ||
|
||
#### Simple override of a single value | ||
Specify the path to the value in the configuration file as a dot-separated string | ||
|
||
```python | ||
Runner( | ||
spark, | ||
config_path="tests/overrider.yaml", | ||
run_date="2023-03-31", | ||
overrides={"runner.watch_period_value": 4}, | ||
) | ||
``` | ||
|
||
#### Override list element | ||
You can refer to list elements by their index (starting with 0) | ||
```python | ||
overrides={"runner.mail.to[1]": "[email protected]"} | ||
``` | ||
|
||
#### Append to list | ||
You can append to list by using index -1 | ||
```python | ||
overrides={"runner.mail.to[-1]": "[email protected]"} | ||
``` | ||
|
||
#### Lookup by attribute value in a list | ||
You can use the following syntax to find a specific element in a list by its attribute value | ||
```python | ||
overrides={"pipelines[name=SimpleGroup].target.target_schema": "new_schema"}, | ||
``` | ||
|
||
#### Injecting/Replacing whole sections | ||
You can directly replace a bigger section of the configuration by providing a dictionary | ||
When the whole section doesn't exist, it will be added to the configuration, however it needs to be added as a whole. | ||
i.e. if the yaml file doesn't specify feature_loader, you can't just add a feature_loader.config_path, you need to add the whole section. | ||
```python | ||
overrides={"pipelines[name=SimpleGroup].feature_loader": | ||
{"config_path": "features_cfg.yaml", | ||
"feature_schema": "catalog.features", | ||
"metadata_schema": "catalog.metadata"}} | ||
``` | ||
|
||
#### Multiple overrides | ||
You can provide multiple overrides at once, the order of execution is not guaranteed | ||
```python | ||
overrides={"runner.watch_period_value": 4, | ||
"runner.watch_period_units": "weeks", | ||
"pipelines[name=SimpleGroup].target.target_schema": "new_schema", | ||
"pipelines[name=SimpleGroup].feature_loader": | ||
{"config_path": "features_cfg.yaml", | ||
"feature_schema": "catalog.features", | ||
"metadata_schema": "catalog.metadata"} | ||
} | ||
``` | ||
|
||
|
||
## <a id="maker"></a> 2.2 - maker | ||
|
@@ -302,6 +363,7 @@ We have a set of pre-defined dependencies: | |
* **dependencies** returns a dictionary containing the job dependencies config | ||
* **table_reader** returns *TableReader* | ||
* **feature_loader** provides *PysparkFeatureLoader* | ||
* **metadata_manager** provides *MetadataManager* | ||
|
||
Apart from that, each **datasource** also becomes a fully usable dependency. Note, that this means that datasources can also be dependent on other datasources - just beware of any circular dependencies! | ||
|
||
|
@@ -310,19 +372,30 @@ With that sorted out, we can now provide a quick example of the *rialto.jobs* mo | |
```python | ||
from pyspark.sql import DataFrame | ||
from rialto.common import TableReader | ||
from rialto.jobs.decorators import job, datasource | ||
from rialto.jobs.decorators import config, job, datasource | ||
from rialto.runner.config_loader import PipelineConfig | ||
from pydantic import BaseModel | ||
|
||
|
||
class ConfigModel(BaseModel): | ||
some_value: int | ||
some_other_value: str | ||
|
||
@config | ||
def my_config(config: PipelineConfig): | ||
return ConfigModel(**config.extras) | ||
|
||
@datasource | ||
def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFrame: | ||
return table_reader.get_latest("my_catalog.my_schema.my_table", until=run_date) | ||
return table_reader.get_latest("my_catalog.my_schema.my_table", date_until=run_date) | ||
|
||
|
||
@job | ||
def my_job(my_datasource: DataFrame) -> DataFrame: | ||
return my_datasource.withColumn("HelloWorld", F.lit(1)) | ||
def my_job(my_datasource: DataFrame, my_config: ConfigModel) -> DataFrame: | ||
return my_datasource.withColumn("HelloWorld", F.lit(my_config.some_value)) | ||
``` | ||
This piece of code | ||
1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner. | ||
This piece of code | ||
1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner. | ||
2. It sources the *my_datasource* and then runs *my_job* on top of that datasource. | ||
3. Rialto adds VERSION (of your package) and INFORMATION_DATE (as per config) columns automatically. | ||
4. The rialto runner stores the final to a catalog, to a table according to the job's name. | ||
|
@@ -383,20 +456,20 @@ import my_package.test_job_module as tjm | |
# Datasource Testing | ||
def test_datasource_a(): | ||
... mocks here ... | ||
|
||
with disable_job_decorators(tjm): | ||
datasource_a_output = tjm.datasource_a(... mocks ...) | ||
|
||
... asserts ... | ||
|
||
# Job Testing | ||
def test_my_job(): | ||
datasource_a_mock = ... | ||
... other mocks... | ||
|
||
with disable_job_decorators(tjm): | ||
job_output = tjm.my_job(datasource_a_mock, ... mocks ...) | ||
|
||
... asserts ... | ||
``` | ||
|
||
|
@@ -418,37 +491,23 @@ This module is used to load features from feature store into your models and scr | |
|
||
Two public classes are exposed form this module. **DatabricksLoader**(DataLoader), **PysparkFeatureLoader**(FeatureLoaderInterface). | ||
|
||
### DatabricksLoader | ||
This is a support class for feature loader and provides the data reading capability from the feature store. | ||
|
||
This class needs to be instantiated with an active spark session and a path to the feature store schema (in the format of "catalog_name.schema_name"). | ||
Optionally a date_column information can be passed, otherwise it defaults to use INFORMATION_DATE | ||
```python | ||
from rialto.loader import DatabricksLoader | ||
|
||
data_loader = DatabricksLoader(spark= spark_instance, schema= "catalog.schema", date_column= "INFORMATION_DATE") | ||
``` | ||
|
||
This class provides one method, read_group(...), which returns a whole feature group for selected date. This is mostly used inside feature loader. | ||
|
||
### PysparkFeatureLoader | ||
|
||
This class needs to be instantiated with an active spark session, data loader and a path to the metadata schema (in the format of "catalog_name.schema_name"). | ||
|
||
```python | ||
from rialto.loader import PysparkFeatureLoader | ||
|
||
feature_loader = PysparkFeatureLoader(spark= spark_instance, data_loader= data_loader_instance, metadata_schema= "catalog.schema") | ||
feature_loader = PysparkFeatureLoader(spark= spark_instance, feature_schema="catalog.schema", metadata_schema= "catalog.schema2", date_column="information_date") | ||
``` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like you removed the databricks reference explicitly to something more generic, but you carrying the concept of catalog, which is databricks specific There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. obviously this carries through to the rest of the changes too ie you reference pysparkfeature loader and not toegther with DatabricksLoader anymore |
||
|
||
#### Single feature | ||
|
||
```python | ||
from rialto.loader import DatabricksLoader, PysparkFeatureLoader | ||
from rialto.loader import PysparkFeatureLoader | ||
from datetime import datetime | ||
|
||
data_loader = DatabricksLoader(spark, "feature_catalog.feature_schema") | ||
feature_loader = PysparkFeatureLoader(spark, data_loader, "metadata_catalog.metadata_schema") | ||
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema") | ||
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date() | ||
|
||
feature = feature_loader.get_feature(group_name="CustomerFeatures", feature_name="AGE", information_date=my_date) | ||
|
@@ -459,11 +518,10 @@ metadata = feature_loader.get_feature_metadata(group_name="CustomerFeatures", fe | |
This method of data access is only recommended for experimentation, as the group schema can evolve over time. | ||
|
||
```python | ||
from rialto.loader import DatabricksLoader, PysparkFeatureLoader | ||
from rialto.loader import PysparkFeatureLoader | ||
from datetime import datetime | ||
|
||
data_loader = DatabricksLoader(spark, "feature_catalog.feature_schema") | ||
feature_loader = PysparkFeatureLoader(spark, data_loader, "metadata_catalog.metadata_schema") | ||
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema") | ||
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date() | ||
|
||
features = feature_loader.get_group(group_name="CustomerFeatures", information_date=my_date) | ||
|
@@ -473,11 +531,10 @@ metadata = feature_loader.get_group_metadata(group_name="CustomerFeatures") | |
#### Configuration | ||
|
||
```python | ||
from rialto.loader import DatabricksLoader, PysparkFeatureLoader | ||
from rialto.loader import PysparkFeatureLoader | ||
from datetime import datetime | ||
|
||
data_loader = DatabricksLoader(spark, "feature_catalog.feature_schema") | ||
feature_loader = PysparkFeatureLoader(spark, data_loader, "metadata_catalog.metadata_schema") | ||
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema") | ||
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date() | ||
|
||
features = feature_loader.get_features_from_cfg(path="local/configuration/file.yaml", information_date=my_date) | ||
|
@@ -563,6 +620,7 @@ reader = TableReader(spark=spark_instance) | |
``` | ||
|
||
usage of _get_table_: | ||
|
||
```python | ||
# get whole table | ||
df = reader.get_table(table="catalog.schema.table", date_column="information_date") | ||
|
@@ -573,40 +631,26 @@ from datetime import datetime | |
start = datetime.strptime("2020-01-01", "%Y-%m-%d").date() | ||
end = datetime.strptime("2024-01-01", "%Y-%m-%d").date() | ||
|
||
df = reader.get_table(table="catalog.schema.table", info_date_from=start, info_date_to=end) | ||
df = reader.get_table(table="catalog.schema.table", date_from=start, date_to=end, date_column="information_date") | ||
``` | ||
|
||
usage of _get_latest_: | ||
|
||
```python | ||
# most recent partition | ||
df = reader.get_latest(table="catalog.schema.table", date_column="information_date") | ||
|
||
# most recent partition until | ||
until = datetime.strptime("2020-01-01", "%Y-%m-%d").date() | ||
|
||
df = reader.get_latest(table="catalog.schema.table", until=until, date_column="information_date") | ||
df = reader.get_latest(table="catalog.schema.table", date_until=until, date_column="information_date") | ||
|
||
``` | ||
For full information on parameters and their optionality see technical documentation. | ||
|
||
_TableReader_ needs an active spark session and an information which column is the **date column**. | ||
There are three options how to pass that information on. | ||
|
||
In order of priority from highest: | ||
* Explicit _date_column_ parameter in _get_table_ and _get_latest_ | ||
```python | ||
reader.get_latest(table="catalog.schema.table", date_column="information_date") | ||
``` | ||
* Inferred from delta metadata, triggered by init parameter, only works on delta tables (e.g. doesn't work on views) | ||
```python | ||
reader = TableReader(spark=spark_instance, infer_partition=True) | ||
reader.get_latest(table="catalog.schema.table") | ||
``` | ||
* A custom sql property defined on the table containing the date column name, defaults to _rialto_date_column_ | ||
```python | ||
reader = TableReader(spark=spark_instance, date_property="rialto_date_column") | ||
reader.get_latest(table="catalog.schema.table") | ||
``` | ||
|
||
# <a id="contributing"></a> 3. Contributing | ||
Contributing: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe confusing to say start at 0 but use 1 as the example :P also do you REALLY need to state starting index?