Skip to content

Commit

Permalink
Modified the demo files for DAIS and eventhub
Browse files Browse the repository at this point in the history
  • Loading branch information
sravanivadali committed Nov 1, 2024
1 parent 221918b commit 3a9b41a
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 57 deletions.
48 changes: 20 additions & 28 deletions demo/launch_af_eventhub_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,49 +48,41 @@ def init_runner_conf(self) -> DLTMetaRunnerConf:
runner_conf = DLTMetaRunnerConf(
run_id=run_id,
username=self.wsi._my_username,
int_tests_dir="file:./demo",
int_tests_dir="demo",
dlt_meta_schema=f"dlt_meta_dataflowspecs_demo_{run_id}",
bronze_schema=f"dlt_meta_bronze_demo_{run_id}",
runners_nb_path=f"/Users/{self.wsi._my_username}/dlt_meta_demo/{run_id}",
source="eventhub",
eventhub_template="demo/conf/eventhub-onboarding.template",
onboarding_file_path="demo/conf/onboarding.json",
env="demo"
env="demo",
# eventhub provided args
eventhub_name=self.args["eventhub_name"],
eventhub_name_append_flow=self.args["eventhub_name_append_flow"],
eventhub_producer_accesskey_name=self.args[
"eventhub_consumer_accesskey_name"
],
eventhub_consumer_accesskey_name=self.args[
"eventhub_consumer_accesskey_name"
],
eventhub_accesskey_secret_name=self.args["eventhub_accesskey_secret_name"],
eventhub_secrets_scope_name=self.args["eventhub_secrets_scope_name"],
eventhub_namespace=self.args["eventhub_namespace"],
eventhub_port=self.args["eventhub_port"]
)
runner_conf.uc_catalog_name = self.args.__dict__['uc_catalog_name']
runner_conf.runners_full_local_path = './demo/dbc/afam_eventhub_runners.dbc'
runner_conf.uc_catalog_name = self.args['uc_catalog_name']
runner_conf.runners_full_local_path = 'demo/notebooks/afam_eventhub_runners'
return runner_conf

def launch_workflow(self, runner_conf: DLTMetaRunnerConf):
created_job = self.create_eventhub_workflow_spec(runner_conf)
created_job = self.create_workflow_spec(runner_conf)
self.open_job_url(runner_conf, created_job)
return created_job


afam_args_map = {
"--profile": "provide databricks cli profile name, if not provide databricks_host and token",
"--uc_catalog_name": "provide databricks uc_catalog name, this is required to create volume, schema, table",
"--eventhub_name": "Provide eventhub_name e.g --eventhub_name=iot",
"--eventhub_name_append_flow": "Provide eventhub_name_append_flow e.g --eventhub_name_append_flow=iot_af",
"--eventhub_producer_accesskey_name": "Provide access key that has write permission on the eventhub",
"--eventhub_consumer_accesskey_name": "Provide access key that has read permission on the eventhub",
"--eventhub_secrets_scope_name": "Provide eventhub_secrets_scope_name e.g \
--eventhub_secrets_scope_name=eventhubs_creds",
"--eventhub_accesskey_secret_name": "Provide eventhub_accesskey_secret_name e.g \
-eventhub_accesskey_secret_name=RootManageSharedAccessKey",
"--eventhub_namespace": "Provide eventhub_namespace e.g --eventhub_namespace=topic-standard",
"--eventhub_port": "Provide eventhub_port e.g --eventhub_port=9093",
}

afeh_mandatory_args = ["uc_catalog_name", "eventhub_name",
"eventhub_name_append_flow", "eventhub_producer_accesskey_name",
"eventhub_consumer_accesskey_name", "eventhub_secrets_scope_name",
"eventhub_namespace", "eventhub_port"]


def main():
args = process_arguments(afam_args_map, afeh_mandatory_args)
workspace_client = get_workspace_api_client(args.profile)
args = process_arguments()
workspace_client = get_workspace_api_client(args['profile'])
dltmeta_afam_demo_runner = DLTMETAFEHDemo(args, workspace_client, "demo")
print("initializing complete")
runner_conf = dltmeta_afam_demo_runner.init_runner_conf()
Expand Down
43 changes: 17 additions & 26 deletions demo/launch_dais_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,22 @@ def init_runner_conf(self) -> DLTMetaRunnerConf:
runner_conf = DLTMetaRunnerConf(
run_id=run_id,
username=self._my_username(self.ws),
int_tests_dir="file:./demo",
int_tests_dir="demo",
dlt_meta_schema=f"dlt_meta_dataflowspecs_demo_{run_id}",
bronze_schema=f"dlt_meta_bronze_dais_demo_{run_id}",
silver_schema=f"dlt_meta_silver_dais_demo_{run_id}",
runners_nb_path=f"/Users/{self._my_username(self.ws)}/dlt_meta_dais_demo/{run_id}",
runners_nb_path=f"/Users/{self.wsi._my_username}/dlt_meta_dais_demo/{run_id}",
runners_full_local_path="demo/notebooks/dais_runners",
# node_type_id=cloud_node_type_id_dict[self.args.__dict__['cloud_provider_name']],
# dbr_version=self.args.__dict__['dbr_version'],
cloudfiles_template="demo/conf/onboarding.template",
env="prod",
source="cloudFiles",
runners_full_local_path='./demo/dbc/dais_dlt_meta_runners.dbc',
source="cloudfiles",
#runners_full_local_path='./demo/dbc/dais_dlt_meta_runners.dbc',
onboarding_file_path='demo/conf/onboarding.json'
)
if self.args.__dict__['uc_catalog_name']:
runner_conf.uc_catalog_name = self.args.__dict__['uc_catalog_name']
if self.args['uc_catalog_name']:
runner_conf.uc_catalog_name = self.args['uc_catalog_name']
runner_conf.uc_volume_name = f"{runner_conf.uc_catalog_name}_dais_demo_{run_id}"

return runner_conf
Expand Down Expand Up @@ -93,14 +94,13 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
Returns:
- created_job: created job object
"""
database, dlt_lib = self.init_db_dltlib(runner_conf)
dltmeta_environments = [
jobs.JobEnvironment(
environment_key="dlt_meta_dais_demo_env",
spec=compute.Environment(client=f"dlt_meta_int_test_{__version__}",
# dependencies=[f"dlt_meta=={__version__}"],
dependencies=["dlt_meta==0.0.8"],
)
environment_key="dl_meta_int_env",
spec=compute.Environment(
client=f"dlt_meta_int_test_{__version__}",
dependencies=[runner_conf.remote_whl_path],
),
)
]
return self.ws.jobs.create(
Expand All @@ -110,14 +110,14 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
jobs.Task(
task_key="setup_dlt_meta_pipeline_spec",
description="test",
environment_key="dlt_meta_dais_demo_env",
environment_key="dl_meta_int_env",
timeout_seconds=0,
python_wheel_task=jobs.PythonWheelTask(
package_name="dlt_meta",
entry_point="run",
named_parameters={
"onboard_layer": "bronze_silver",
"database": database,
"database": f"{runner_conf.uc_catalog_name}.{runner_conf.dlt_meta_schema}",
"onboarding_file_path": f"{runner_conf.uc_volume_path}/demo/conf/onboarding.json",
"silver_dataflowspec_table": "silver_dataflowspec_cdc",
"silver_dataflowspec_path": (
Expand Down Expand Up @@ -154,7 +154,7 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
description="Load Incremental Data",
depends_on=[jobs.TaskDependency(task_key="silver_initial_run")],
notebook_task=jobs.NotebookTask(
notebook_path=f"{runner_conf.runners_nb_path}/runners/load_incremental_data",
notebook_path=f"{runner_conf.runners_nb_path}/runners/load_incremental_data.py",
base_parameters={
"dbfs_tmp_path": runner_conf.uc_volume_path
}
Expand All @@ -179,19 +179,10 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
)


dais_args_map = {"--profile": "provide databricks cli profile name, if not provide databricks_host and token",
"--uc_catalog_name": "provide databricks uc_catalog name, \
this is required to create volume, schema, table",
"--cloud_provider_name": "provide cloud provider name. Supported values are aws , azure , gcp"
}

dais_mandatory_args = ["uc_catalog_name", "cloud_provider_name"]


def main():
"""Entry method to run integration tests."""
args = process_arguments(dais_args_map, dais_mandatory_args)
workspace_client = get_workspace_api_client(args.profile)
args = process_arguments()
workspace_client = get_workspace_api_client(args['profile'])
dltmeta_dais_demo_runner = DLTMETADAISDemo(args, workspace_client, "demo")
runner_conf = dltmeta_dais_demo_runner.init_runner_conf()
dltmeta_dais_demo_runner.run(runner_conf)
Expand Down
10 changes: 10 additions & 0 deletions demo/notebooks/afam_eventhub_runners/init_dlt_meta_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Databricks notebook source
dlt_meta_whl = spark.conf.get("dlt_meta_whl")
%pip install $dlt_meta_whl # noqa : E999

# COMMAND ----------

layer = spark.conf.get("layer", None)

from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer)
76 changes: 76 additions & 0 deletions demo/notebooks/afam_eventhub_runners/publish_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ## Install azure-eventhub

# COMMAND ----------

# MAGIC %sh pip install azure-eventhub

# COMMAND ----------

dbutils.library.restartPython()

# COMMAND ----------

dbutils.widgets.text("eventhub_name","eventhub_name","")
dbutils.widgets.text("eventhub_name_append_flow","eventhub_name_append_flow","")
dbutils.widgets.text("eventhub_namespace","eventhub_namespace","")
dbutils.widgets.text("eventhub_secrets_scope_name","eventhub_secrets_scope_name","")
dbutils.widgets.text("eventhub_accesskey_name","eventhub_accesskey_name","")
dbutils.widgets.text("eventhub_input_data","eventhub_input_data","")
dbutils.widgets.text("eventhub_append_flow_input_data","eventhub_append_flow_input_data","")

# COMMAND ----------

eventhub_name = dbutils.widgets.get("eventhub_name")
eventhub_name_append_flow = dbutils.widgets.get("eventhub_name_append_flow")
eventhub_namespace = dbutils.widgets.get("eventhub_namespace")
eventhub_secrets_scope_name = dbutils.widgets.get("eventhub_secrets_scope_name")
eventhub_accesskey_name = dbutils.widgets.get("eventhub_accesskey_name")
eventhub_input_data = dbutils.widgets.get("eventhub_input_data")
eventhub_append_flow_input_data = dbutils.widgets.get("eventhub_append_flow_input_data")

# COMMAND ----------

print(f"eventhub_name={eventhub_name}, eventhub_name_append_flow={eventhub_name_append_flow}, eventhub_namespace={eventhub_namespace}, eventhub_secrets_scope_name={eventhub_secrets_scope_name}, eventhub_accesskey_name={eventhub_accesskey_name}, eventhub_input_data={eventhub_input_data}, eventhub_append_flow_input_data={eventhub_append_flow_input_data}")

# COMMAND ----------

import json
from azure.eventhub import EventHubProducerClient, EventData

eventhub_shared_access_value = dbutils.secrets.get(scope = eventhub_secrets_scope_name, key = eventhub_accesskey_name)
eventhub_conn = f"Endpoint=sb://{eventhub_namespace}.servicebus.windows.net/;SharedAccessKeyName={eventhub_accesskey_name};SharedAccessKey={eventhub_shared_access_value}"

client = EventHubProducerClient.from_connection_string(eventhub_conn, eventhub_name=eventhub_name)



# COMMAND ----------

# MAGIC %md
# MAGIC ## Publish iot data to eventhub

# COMMAND ----------

with open(f"{eventhub_input_data}") as f:
data = json.load(f)

for event in data:
event_data_batch = client.create_batch()
event_data_batch.add(EventData(json.dumps(event)))
with client:
client.send_batch(event_data_batch)

# COMMAND ----------

append_flow_client = EventHubProducerClient.from_connection_string(eventhub_conn, eventhub_name=eventhub_name_append_flow)

with open(f"{eventhub_append_flow_input_data}") as f:
af_data = json.load(f)

for event in af_data:
event_data_batch = client.create_batch()
event_data_batch.add(EventData(json.dumps(event)))
with client:
append_flow_client.send_batch(event_data_batch)
38 changes: 38 additions & 0 deletions demo/notebooks/afam_eventhub_runners/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Databricks notebook source
import pandas as pd

run_id = dbutils.widgets.get("run_id")
uc_enabled = eval(dbutils.widgets.get("uc_enabled"))
uc_catalog_name = dbutils.widgets.get("uc_catalog_name")
output_file_path = dbutils.widgets.get("output_file_path")
bronze_schema = dbutils.widgets.get("bronze_schema")
log_list = []

# Assumption is that to get to this notebook Bronze and Silver completed successfully
log_list.append("Completed Bronze Eventhub DLT Pipeline.")

UC_TABLES = {
f"{uc_catalog_name}.{bronze_schema}.bronze_{run_id}_iot": 20,
f"{uc_catalog_name}.{bronze_schema}.bronze_{run_id}_iot_quarantine": 2
}

NON_UC_TABLES = {
f"{uc_catalog_name}.{bronze_schema}.bronze_{run_id}_iot": 20,
f"{uc_catalog_name}.{bronze_schema}.bronze_{run_id}_iot_quarantine": 2
}

log_list.append("Validating DLT EVenthub Bronze Table Counts...")
tables = UC_TABLES if uc_enabled else NON_UC_TABLES
for table, counts in tables.items():
query = spark.sql(f"SELECT count(*) as cnt FROM {table}")
cnt = query.collect()[0].cnt

log_list.append(f"Validating Counts for Table {table}.")
try:
assert int(cnt) >= counts
log_list.append(f"Expected >= {counts} Actual: {cnt}. Passed!")
except AssertionError:
log_list.append(f"Expected > {counts} Actual: {cnt}. Failed!")

pd_df = pd.DataFrame(log_list)
pd_df.to_csv(output_file_path)
10 changes: 10 additions & 0 deletions demo/notebooks/dais_runners/init_dlt_meta_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Databricks notebook source
dlt_meta_whl = spark.conf.get("dlt_meta_whl")
%pip install $dlt_meta_whl # noqa : E999

# COMMAND ----------

layer = spark.conf.get("layer", None)

from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer)
10 changes: 10 additions & 0 deletions demo/notebooks/dais_runners/load_incremental_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Databricks notebook source
dbfs_tmp_path = dbutils.widgets.get("dbfs_tmp_path")

# COMMAND ----------

source_base_path = f"{dbfs_tmp_path}/demo/resources/incremental_data/"
target_base_path = f"{dbfs_tmp_path}/demo/resources/data/"
domains = ["customers","transactions","stores","products"]
for domain in domains:
dbutils.fs.cp(f"{source_base_path}{domain}/",f"{target_base_path}{domain}/",True)
7 changes: 4 additions & 3 deletions integration_tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class DLTMetaRunnerConf:
job_id: str = None
test_output_file_path: str = None
onboarding_fanout_templates: str = None # "demo/conf/onboarding_fanout_cars.template",
onboarding_file_path: str = None # "demo/conf/onboarding_cars.json",
#onboarding_file_path: str = None # "demo/conf/onboarding_cars.json",
onboarding_fanout_file_path: str = None # "demo/conf/onboarding_fanout_cars.json",

# cloudfiles info
Expand All @@ -120,7 +120,7 @@ class DLTMetaRunnerConf:
)

# eventhub info
eventhub_template: str = ("integration_tests/conf/eventhub-onboarding.template",)
eventhub_template: str = "integration_tests/conf/eventhub-onboarding.template"
eventhub_input_data: str = None
eventhub_append_flow_input_data: str = None
eventhub_name: str = None
Expand Down Expand Up @@ -496,7 +496,7 @@ def get_validate_task_key(self, source):
elif source == "snapshot":
return "bronze_v3_dlt_pipeline"
else:
return "publish_events"
return "bronze_dlt_pipeline"

def initialize_uc_resources(self, runner_conf):
"""Create UC schemas and volumes needed to run the integration tests"""
Expand Down Expand Up @@ -581,6 +581,7 @@ def generate_onboarding_file(self, runner_conf: DLTMetaRunnerConf):
onboard_json_a2 = f.read()

for key, val in string_subs.items():
val = "" if val is None else val # Ensure val is a string
onboard_json = onboard_json.replace(key, val)
if runner_conf.source == "cloudfiles":
onboard_json_a2 = onboard_json_a2.replace(key, val)
Expand Down

0 comments on commit 3a9b41a

Please sign in to comment.