Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge code restructring change to apply changes from snapshot feature #106

Merged
merged 39 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7a6b28f
added integration test outputs to gitinore
dvanderwood Oct 1, 2024
8fc401d
uc catalog name consistency in sample code
dvanderwood Oct 1, 2024
04adc91
updated runner notebook upload to .py files from .dbc so updates can …
dvanderwood Oct 2, 2024
4b9bffc
lower the source during the arg parse instead of at each time it is used
dvanderwood Oct 2, 2024
5a38c85
Complete rewrite of the arg parser for the intgegration tests
dvanderwood Oct 2, 2024
72fe9dc
Update args parsing
dvanderwood Oct 2, 2024
c337f59
begin movign to uc volumes only and getting rid of dbfs as well as si…
dvanderwood Oct 2, 2024
3e1e486
uc resource generation update
dvanderwood Oct 2, 2024
6a8eb47
continued unused file, and dbfs code removal along with code clean-up…
dvanderwood Oct 3, 2024
f360160
update upload to databricks
dvanderwood Oct 4, 2024
c678ac8
only upload wheel to workspace location
dvanderwood Oct 4, 2024
d6020b5
added uc volume upload of the wheel if provided
dvanderwood Oct 4, 2024
acee79b
wheel upload added, all data uploads reworked
dvanderwood Oct 4, 2024
d316554
initial upload and setup for integration testing for cloud files is done
dvanderwood Oct 4, 2024
7b71fc9
job workflow clean-up
dvanderwood Oct 7, 2024
5ee1843
cloud files testing, added back accidently removed json
dvanderwood Oct 7, 2024
e70f85d
cloud files integration test works
dvanderwood Oct 7, 2024
9e77b1f
formatting
dvanderwood Oct 7, 2024
e78c751
continuing code simplication between 3 integration run types
dvanderwood Oct 8, 2024
1cdb503
redundant doc strings removal
dvanderwood Oct 8, 2024
37652f9
remove early exit
dvanderwood Oct 8, 2024
a6249fe
eventhub_accesskey_name updates
dvanderwood Oct 8, 2024
c740093
fixed job create
dvanderwood Oct 8, 2024
b769973
fixed clean up
dvanderwood Oct 8, 2024
f6bb5fe
redundant message removed
dvanderwood Oct 8, 2024
381531c
formatting
dvanderwood Oct 8, 2024
eb44fa5
non cloud files testing, niether can be confirmed right now
dvanderwood Oct 8, 2024
f6954fb
af cloud demo update
dvanderwood Oct 9, 2024
ec9a317
formatting
dvanderwood Oct 9, 2024
ae27548
formatting
dvanderwood Oct 9, 2024
a60d0e2
linting fixes
dvanderwood Oct 10, 2024
7b17edf
formatting and linting
dvanderwood Oct 14, 2024
0d9975a
removed match syntax since testing on python 3.9
dvanderwood Oct 14, 2024
386db9d
removed other match statement
dvanderwood Oct 14, 2024
05e8b3a
uc volume path for onboarding file test
dvanderwood Oct 14, 2024
b87620a
Merge pull request #101 from dvanderwood/issue_97_98
ravi-databricks Oct 14, 2024
27dadc8
fixed cli related tests and added new unit test for onboard and deploy
ravi-databricks Oct 15, 2024
0da22ce
Added unit test coverage for cli.py
ravi-databricks Oct 15, 2024
93b6fa4
Merge branch 'issue_86' into issue_97
ravi-databricks Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ omit =
src/install.py
src/uninstall.py
src/config.py
src/cli.py

[report]
exclude_lines =
Expand Down
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[flake8]
ignore = BLK100,E402,W503
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,dist,.eggs
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,dist,.eggs,integration_tests/notebooks/*/*.py,demo/notebooks/*/*.py,.venv
builtins = dlt,dbutils,spark,display,log_integration_test,pyspark.dbutils
max-line-length = 120
per-file-ignores =
Expand Down
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ deployment-merged.yaml
.databricks
.databricks-login.json
demo/conf/onboarding.json
integration_tests/conf/onboarding.json
integration_tests/conf/onboarding*.json
demo/conf/onboarding*.json
databricks.yaml
integration_test_output*.csv

.databricks
databricks.yaml
37 changes: 19 additions & 18 deletions demo/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# [DLT-META](https://github.com/databrickslabs/dlt-meta) DEMO's
# [DLT-META](https://github.com/databrickslabs/dlt-meta) DEMO's
1. [DAIS 2023 DEMO](#dais-2023-demo): Showcases DLT-META's capabilities of creating Bronze and Silver DLT pipelines with initial and incremental mode automatically.
2. [Databricks Techsummit Demo](#databricks-tech-summit-fy2024-demo): 100s of data sources ingestion in bronze and silver DLT pipelines automatically.
3. [Append FLOW Autoloader Demo](#append-flow-autoloader-file-metadata-demo): Write to same target from multiple sources using [dlt.append_flow](https://docs.databricks.com/en/delta-live-tables/flows.html#append-flows) and adding [File metadata column](https://docs.databricks.com/en/ingestion/file-metadata-column.html)
Expand All @@ -8,7 +8,7 @@



# DAIS 2023 DEMO
# DAIS 2023 DEMO
## [DAIS 2023 Session Recording](https://www.youtube.com/watch?v=WYv5haxLlfA)
This Demo launches Bronze and Silver DLT pipelines with following activities:
- Customer and Transactions feeds for initial load
Expand All @@ -21,7 +21,7 @@ This Demo launches Bronze and Silver DLT pipelines with following activities:
2. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)

3. ```commandline
git clone https://github.com/databrickslabs/dlt-meta.git
git clone https://github.com/databrickslabs/dlt-meta.git
```

4. ```commandline
Expand Down Expand Up @@ -53,10 +53,10 @@ This demo will launch auto generated tables(100s) inside single bronze and silve
2. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)

3. ```commandline
git clone https://github.com/databrickslabs/dlt-meta.git
git clone https://github.com/databrickslabs/dlt-meta.git
```

4. ```commandline
4. ```commandline
cd dlt-meta
```

Expand All @@ -69,8 +69,8 @@ This demo will launch auto generated tables(100s) inside single bronze and silve
export PYTHONPATH=$dlt_meta_home
```

6. ```commandline
python demo/launch_techsummit_demo.py --uc_catalog_name=<<>>
6. ```commandline
python demo/launch_techsummit_demo.py --uc_catalog_name=<<uc catalog name>>
```
- uc_catalog_name : Unity catalog name
- you can provide `--profile=databricks_profile name` in case you already have databricks cli otherwise command prompt will ask host and token
Expand All @@ -89,7 +89,7 @@ This demo will perform following tasks:
2. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)

3. ```commandline
git clone https://github.com/databrickslabs/dlt-meta.git
git clone https://github.com/databrickslabs/dlt-meta.git
```

4. ```commandline
Expand All @@ -106,9 +106,10 @@ This demo will perform following tasks:
```

6. ```commandline
python demo/launch_af_cloudfiles_demo.py --uc_catalog_name=<<>>
python demo/launch_af_cloudfiles_demo.py --uc_catalog_name=<<uc catalog name>> --source=cloudfiles --cloud_provider_name=aws --profile=<<DEFAULT>>
```
- uc_catalog_name : Unity Catalog name
- cloud_provier_name : Which cloud you are using, either AWS, Azure, or GCP
- you can provide `--profile=databricks_profile name` in case you already have databricks cli otherwise command prompt will ask host and token

![af_am_demo.png](../docs/static/images/af_am_demo.png)
Expand All @@ -122,7 +123,7 @@ This demo will perform following tasks:
2. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)

3. ```commandline
git clone https://github.com/databrickslabs/dlt-meta.git
git clone https://github.com/databrickslabs/dlt-meta.git
```

4. ```commandline
Expand All @@ -142,14 +143,14 @@ This demo will perform following tasks:
- ```
commandline databricks secrets create-scope eventhubs_dltmeta_creds
```
- ```commandline
- ```commandline
databricks secrets put-secret --json '{
"scope": "eventhubs_dltmeta_creds",
"key": "RootManageSharedAccessKey",
"string_value": "<<value>>"
}'
}'
```
- Create databricks secrets to store producer and consumer keys using the scope created in step 2
- Create databricks secrets to store producer and consumer keys using the scope created in step 2

- Following are the mandatory arguments for running EventHubs demo
- uc_catalog_name : unity catalog name e.g. ravi_dlt_meta_uc
Expand All @@ -161,8 +162,8 @@ This demo will perform following tasks:
- eventhub_secrets_scope_name: Databricks secret scope name e.g. eventhubs_dltmeta_creds
- eventhub_port: Eventhub port

7. ```commandline
python3 demo/launch_af_eventhub_demo.py --uc_catalog_name=<<>> --eventhub_name=dltmeta_demo --eventhub_name_append_flow=dltmeta_demo_af --eventhub_secrets_scope_name=dltmeta_eventhub_creds --eventhub_namespace=dltmeta --eventhub_port=9093 --eventhub_producer_accesskey_name=RootManageSharedAccessKey --eventhub_consumer_accesskey_name=RootManageSharedAccessKey --eventhub_accesskey_secret_name=RootManageSharedAccessKey
7. ```commandline
python3 demo/launch_af_eventhub_demo.py --uc_catalog_name=<<uc catalog name>> --eventhub_name=dltmeta_demo --eventhub_name_append_flow=dltmeta_demo_af --eventhub_secrets_scope_name=dltmeta_eventhub_creds --eventhub_namespace=dltmeta --eventhub_port=9093 --eventhub_producer_accesskey_name=RootManageSharedAccessKey --eventhub_consumer_accesskey_name=RootManageSharedAccessKey --eventhub_accesskey_secret_name=RootManageSharedAccessKey
```

![af_eh_demo.png](../docs/static/images/af_eh_demo.png)
Expand All @@ -173,15 +174,15 @@ This demo will perform following tasks:
- Run the onboarding process for the bronze cars table, which contains data from various countries.
- Run the onboarding process for the silver tables, which have a `where_clause` based on the country condition specified in [silver_transformations_cars.json](https://github.com/databrickslabs/dlt-meta/blob/main/demo/conf/silver_transformations_cars.json).
- Run the Bronze DLT pipeline which will produce cars table.
- Run Silver DLT pipeline, fanning out from the bronze cars table to country-specific tables such as cars_usa, cars_uk, cars_germany, and cars_japan.
- Run Silver DLT pipeline, fanning out from the bronze cars table to country-specific tables such as cars_usa, cars_uk, cars_germany, and cars_japan.

### Steps:
1. Launch Command Prompt

2. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)

3. ```commandline
git clone https://github.com/databrickslabs/dlt-meta.git
git clone https://github.com/databrickslabs/dlt-meta.git
```

4. ```commandline
Expand Down Expand Up @@ -252,4 +253,4 @@ This demo will perform following tasks:
```commandline
python demo/launch_acfs_demo.py --uc_catalog_name=<<>>
```
![acfs.png](../docs/static/images/acfs.png)
![acfs.png](../docs/static/images/acfs.png)
30 changes: 14 additions & 16 deletions demo/launch_af_cloudfiles_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
get_workspace_api_client,
process_arguments
)
import traceback


class DLTMETAFCFDemo(DLTMETARunner):
Expand All @@ -30,6 +31,7 @@ def run(self, runner_conf: DLTMetaRunnerConf):
self.launch_workflow(runner_conf)
except Exception as e:
print(e)
traceback.print_exc()

def init_runner_conf(self) -> DLTMetaRunnerConf:
"""
Expand All @@ -44,7 +46,8 @@ def init_runner_conf(self) -> DLTMetaRunnerConf:
runner_conf = DLTMetaRunnerConf(
run_id=run_id,
username=self.wsi._my_username,
int_tests_dir="file:./demo",
uc_catalog_name=self.args["uc_catalog_name"],
int_tests_dir="demo",
dlt_meta_schema=f"dlt_meta_dataflowspecs_demo_{run_id}",
bronze_schema=f"dlt_meta_bronze_demo_{run_id}",
silver_schema=f"dlt_meta_silver_demo_{run_id}",
Expand All @@ -54,29 +57,24 @@ def init_runner_conf(self) -> DLTMetaRunnerConf:
cloudfiles_A2_template="demo/conf/cloudfiles-onboarding_A2.template",
onboarding_file_path="demo/conf/onboarding.json",
onboarding_A2_file_path="demo/conf/onboarding_A2.json",
env="demo"
env="demo",
runners_full_local_path='./demo/notebooks/afam_cloudfiles_runners/',
test_output_file_path=(
f"/Users/{self.wsi._my_username}/dlt_meta_demo/"
f"{run_id}/demo-output.csv"
),
)
runner_conf.uc_catalog_name = self.args.__dict__['uc_catalog_name']
runner_conf.runners_full_local_path = './demo/dbc/afam_cloud_files_runners.dbc'

return runner_conf

def launch_workflow(self, runner_conf: DLTMetaRunnerConf):
created_job = self.create_snapshot_workflow_spec(runner_conf)
created_job = self.create_workflow_spec(runner_conf)
self.open_job_url(runner_conf, created_job)


afam_args_map = {
"--profile": "provide databricks cli profile name, if not provide databricks_host and token",
"--uc_catalog_name": "provide databricks uc_catalog name, this is required to create volume, schema, table"
}

afam_mandatory_args = [
"uc_catalog_name"]


def main():
args = process_arguments(afam_args_map, afam_mandatory_args)
workspace_client = get_workspace_api_client(args.profile)
args = process_arguments()
workspace_client = get_workspace_api_client(args["profile"])
dltmeta_afam_demo_runner = DLTMETAFCFDemo(args, workspace_client, "demo")
print("initializing complete")
runner_conf = dltmeta_afam_demo_runner.init_runner_conf()
Expand Down
3 changes: 2 additions & 1 deletion demo/launch_dais_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
DLTMETARunner,
DLTMetaRunnerConf,
get_workspace_api_client,
process_arguments
process_arguments,
cloud_node_type_id_dict
)


Expand Down
10 changes: 10 additions & 0 deletions demo/notebooks/afam_cloudfiles_runners/init_dlt_meta_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Databricks notebook source
dlt_meta_whl = spark.conf.get("dlt_meta_whl")
%pip install $dlt_meta_whl # noqa : E999

# COMMAND ----------

layer = spark.conf.get("layer", None)

from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer)
39 changes: 39 additions & 0 deletions demo/notebooks/afam_cloudfiles_runners/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Databricks notebook source
import pandas as pd

run_id = dbutils.widgets.get("run_id")
uc_enabled = eval(dbutils.widgets.get("uc_enabled"))
uc_catalog_name = dbutils.widgets.get("uc_catalog_name")
bronze_schema = dbutils.widgets.get("bronze_schema")
silver_schema = dbutils.widgets.get("silver_schema")
output_file_path = dbutils.widgets.get("output_file_path")
log_list = []

# Assumption is that to get to this notebook Bronze and Silver completed successfully
log_list.append("Completed Bronze DLT Pipeline.")
log_list.append("Completed Silver DLT Pipeline.")

UC_TABLES = {
f"{uc_catalog_name}.{bronze_schema}.transactions": 10002,
f"{uc_catalog_name}.{bronze_schema}.transactions_quarantine": 6,
f"{uc_catalog_name}.{bronze_schema}.customers": 51453,
f"{uc_catalog_name}.{bronze_schema}.customers_quarantine": 256,
f"{uc_catalog_name}.{silver_schema}.transactions": 8759,
f"{uc_catalog_name}.{silver_schema}.customers": 73212,
}


log_list.append("Validating DLT Bronze and Silver Table Counts...")
for table, counts in UC_TABLES.items():
query = spark.sql(f"SELECT count(*) as cnt FROM {table}")
cnt = query.collect()[0].cnt

log_list.append(f"Validating Counts for Table {table}.")
try:
assert int(cnt) == counts
log_list.append(f"Expected: {counts} Actual: {cnt}. Passed!")
except AssertionError:
log_list.append(f"Expected: {counts} Actual: {cnt}. Failed!")

pd_df = pd.DataFrame(log_list)
pd_df.to_csv(output_file_path)
29 changes: 14 additions & 15 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#### Run Integration Tests
1. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)
- Once you install Databricks CLI, authenticate your current machine to a Databricks Workspace:

```commandline
databricks auth login --host WORKSPACE_HOST
```
Expand Down Expand Up @@ -29,26 +29,26 @@
7. ```commandline
dlt_meta_home=$(pwd)
```

8. ```commandline
export PYTHONPATH=$dlt_meta_home
```

9. Run integration test against cloudfile or eventhub or kafka using below options: If databricks profile configured using CLI then pass ```--profile <profile-name>``` to below command otherwise provide workspace url and token in command line
- 9a. Run the command for **cloudfiles**
```commandline
python integration_tests/run_integration_tests.py --uc_catalog_name= --source=cloudfiles
9. Run integration test against cloudfile or eventhub or kafka using below options. To use the Databricks profile configured using CLI then pass ```--profile <profile-name>``` to below command otherwise provide workspace url and token in command line. You will also need to provide a Unity Catalog catalog for which the schemas, tables, and files will be created in.

- 9a. Run the command for **cloudfiles**
```commandline
python integration_tests/run_integration_tests.py --uc_catalog_name=<<uc catalog name>> --source=cloudfiles --cloud_provider_name=aws --profile=<<DEFAULT>>
```

- 9b. Run the command for **eventhub**
```commandline
python integration_tests/run_integration_tests.py --uc_catalog_name=<<>> --source=eventhub --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer
```commandline
python integration_tests/run_integration_tests.py --uc_catalog_name=<<uc catalog name>> --source=eventhub --cloud_provider_name=aws --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer --eventhub_name_append_flow=test_append_flow --eventhub_accesskey_secret_name=test_secret_name --profile=<<DEFAULT>>
```

- - For eventhub integration tests, the following are the prerequisites:
1. Needs eventhub instance running
2. Use Databricks CLI, Create databricks secrets scope for eventhub keys (```databricks secrets create-scope eventhubs_creds```)
3. Use Databricks CLI, Create databricks secrets to store producer and consumer keys using the scope created in step
3. Use Databricks CLI, Create databricks secrets to store producer and consumer keys using the scope created in step

- - Following are the mandatory arguments for running EventHubs integration test
1. Provide your eventhub topic : --eventhub_name
Expand All @@ -61,14 +61,13 @@

- 9c. Run the command for **kafka**
```commandline
python integration_tests/run_integration_tests.py --uc_catalog_name=<<>> --source=kafka --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092
```
python integration_tests/run_integration_tests.py --uc_catalog_name=<<uc catalog name>> --source=kafka --kafka_topic=dlt-meta-integration-test --kafka_broker=host:9092 --cloud_provider_name=aws --profile=DEFAULT

- - For kafka integration tests, the following are the prerequisites:
1. Needs kafka instance running

- - Following are the mandatory arguments for running EventHubs integration test
1. Provide your kafka topic name : --kafka_topic_name
1. Provide your kafka topic name : --kafka_topic
2. Provide kafka_broker : --kafka_broker

- 9d. Run the command for **snapshot**
Expand All @@ -77,10 +76,10 @@
```


10. Once finished integration output file will be copied locally to
10. Once finished integration output file will be copied locally to
```integration-test-output_<run_id>.txt```

11. Output of a successful run should have the following in the file
11. Output of a successful run should have the following in the file
```
,0
0,Completed Bronze DLT Pipeline.
Expand Down
Loading
Loading