Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dlt.apply_changes_from_snapshot #86

Open
ravi-databricks opened this issue Aug 7, 2024 · 1 comment
Open

Add support for dlt.apply_changes_from_snapshot #86

ravi-databricks opened this issue Aug 7, 2024 · 1 comment
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@ravi-databricks
Copy link
Contributor

Provide support for dlt.apply_changes_from_snapshot

@ravi-databricks
Copy link
Contributor Author

ravi-databricks commented Sep 27, 2024

Implementation Details:
Onboarding:

  1. Introduce snapshot format inside onboarding file
  2. Introduce bronze_apply_changes_from_snapshot config
    keys and scd_type are mandatory fields
"bronze_apply_changes_from_snapshot":{
      "keys": ["id"] 
      "scd_type": "1"
      "track_history_column_list": []
      "track_history_except_column_list":[]
}

DataflowPipeline:

  1. Add argument to dataflowpipeline to accept snapshot_reader_func
  2. snapshot_reader_func will be applied to dlt.apply_changes_from_snapshot while doing bronze write

Usage:

  1. Provide snapshot reader function in a notebook while invoking Dataflowpipeline:
  2. Introduce new method
pip install dlt-meta
import dlt
from src.dataflow_spec import BronzeDataflowSpec

def exist(path):
    try:
        if dbutils.fs.ls(path) is None:
            return False
        else:
            return True
    except:
        return False


def next_snapshot_and_version(latest_snapshot_version, dataflow_spec):
    latest_snapshot_version = latest_snapshot_version or 0
    next_version = latest_snapshot_version + 1    
    bronze_dataflow_spec: BronzeDataflowSpec = dataflow_spec
    options = bronze_dataflow_spec.readerConfigOptions
    snapshot_format =  bronze_dataflow_spec.sourceDetails["snapshot_format"]
    snapshot_root_path = bronze_dataflow_spec.sourceDetails['path']    
    snapshot_path = f"{snapshot_root_path}{next_version}.csv"
    if (exist(snapshot_path)):
        snapshot = spark.read.format(snapshot_format).options(**options).load(snapshot_path)
        return (snapshot, next_version)
    else:
        # No snapshot available
        return None 


layer = spark.conf.get("layer", None)
from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer, next_snapshot_and_version=next_snapshot_and_version) 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

When branches are created from issues, their pull requests are automatically linked.

3 participants