Skip to content

Commit

Permalink
feat: added metrics and more settings (#12)
Browse files Browse the repository at this point in the history
* feat: integrate OpenTelemetry for metrics collection and enhance Kafka settings

* feat: add synthea properties file and update sample data generation script for patient resources

* fix: ensure newline at end of file in synthea.properties and requirements.txt

* refactor: remove unused import for get_meter in fhir_to_lakehouse.py
  • Loading branch information
chgl authored Jan 28, 2025
1 parent ad44fe0 commit 4277635
Show file tree
Hide file tree
Showing 6 changed files with 29,407 additions and 18 deletions.
7 changes: 4 additions & 3 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
KAFKA_NUM_PARTITIONS: 12
KAFKA_MESSAGE_MAX_BYTES: 104857600 # 100MiB

kafbat-ui:
Expand Down Expand Up @@ -49,9 +49,10 @@ services:
restart: on-failure
command:
[
"while true; do kafkacat -X message.max.bytes=104857600 -b kafka:9092 -t fhir.msg -P -l /data/bundles.ndjson; sleep 5; done",
"while true; do kafkacat -X message.max.bytes=104857600 -b kafka:9092 -t fhir.msg -P -l /data/bundles.ndjson; sleep 30; done",
]
volumes:
- ./data/bundles.ndjson:/data/bundles.ndjson:ro
depends_on:
- kafka
kafka:
condition: service_started
29,330 changes: 29,327 additions & 3 deletions data/bundles.ndjson

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions data/generate-sample-data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ else
echo "File $SYNTHEA_JAR already exists. Skipping download."
fi

java -jar synthea-with-dependencies.jar -p "10" --exporter.baseDirectory="./synthea"
java -jar synthea-with-dependencies.jar -p "25000" -c synthea.properties

OUTPUT_FILE="bundles.ndjson"

echo "" >"$OUTPUT_FILE"

for file in "./synthea/fhir"/*.json; do
# Ensure the file is a valid JSON file
if [[ -f "$file" ]]; then
# Convert the JSON file into NDJSON format (assuming it's a JSON array)
jq -c '.' "$file" >>"$OUTPUT_FILE"
# Synthea always generates Patient & Encounter resources, but we only care about Patients
jq -c '.entry |= map(
select(.resource.resourceType == "Patient") |
.request.method = "PUT" |
.request.url = "\(.resource.resourceType)/\(.resource.id | sub("^urn:uuid:"; ""))"
)' "$file" >>"$OUTPUT_FILE"
fi
done

# TODO: need to path the transactions entry.request.method to PUT and .url to resourceType/id
5 changes: 5 additions & 0 deletions data/synthea.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
exporter.fhir.export = true
exporter.hospital.fhir.export = false
exporter.practitioner.fhir.export = false
exporter.fhir.included_resources = Patient
exporter.baseDirectory = ./synthea
67 changes: 61 additions & 6 deletions fhir_to_lakehouse/fhir_to_lakehouse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import time

import typed_settings as ts
from delta import DeltaTable
Expand All @@ -9,14 +10,22 @@
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructField, StructType

from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.metrics import get_meter_provider, set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider

from prometheus_client import start_http_server

HERE = os.path.abspath(os.path.dirname(__file__))


@ts.settings
class KafkaSettings:
bootstrap_servers: str = "localhost:9094"
topics: str = "fhir.msg"
max_offsets_per_trigger: int = 1000
max_offsets_per_trigger: int = 10000
min_offsets_per_trigger: int = 10000
max_trigger_delay: str = "15m"


@ts.settings
Expand All @@ -29,6 +38,7 @@ class SparkSettings:
checkpoint_dir: str = "s3a://fhir/checkpoint"
driver_memory: str = "4g"
upkeep_interval: int = 50
streaming_processing_time: str = "0 seconds"


@ts.settings
Expand All @@ -39,12 +49,27 @@ class Settings:
aws_secret_access_key: str = ts.secret(default="miniopass")
delta_database_dir: str = "s3a://fhir/warehouse"
vacuum_retention_hours: int = 24
metrics_port: int = 8000
metrics_addr: str = "127.0.0.1"


settings = ts.load(Settings, appname="fhir_to_lakehouse", env_prefix="")

logger.info("Settings: {settings}", settings=settings)

start_http_server(port=settings.metrics_port, addr=settings.metrics_addr)

reader = PrometheusMetricReader()
# Meter is responsible for creating and recording metrics
set_meter_provider(MeterProvider(metric_readers=[reader]))
meter = get_meter_provider().get_meter("fhir_to_lakehouse.instrumentation")

delta_operations_timer = meter.create_histogram(
name="delta-operation-duration",
unit="seconds",
description="Duration of Delta Table operations",
)

# other config can be set via $SPARK_HOME/conf/spark-defaults.conf,
# e.g. compression type.
spark = (
Expand All @@ -70,13 +95,17 @@ class Settings:
settings.spark.driver_memory,
)
.config("spark.ui.showConsoleProgress", "false")
.config("spark.ui.prometheus.enabled", "true")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.sql.warehouse.dir", settings.spark.warehouse_dir)
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "zstd")
.config("parquet.compression.codec.zstd.level", "9")
.config("spark.io.compression.codec", "zstd")
.config(
"spark.hadoop.fs.s3a.path.style.access",
"true",
Expand Down Expand Up @@ -110,6 +139,8 @@ class Settings:
.option("groupIdPrefix", "fhir-to-lakehouse")
.option("includeHeaders", "true")
.option("maxOffsetsPerTrigger", str(settings.kafka.max_offsets_per_trigger))
.option("minOffsetsPerTrigger", str(settings.kafka.min_offsets_per_trigger))
.option("maxTriggerDelay", settings.kafka.max_trigger_delay)
.load()
)

Expand Down Expand Up @@ -182,8 +213,11 @@ def upsert_to_delta(micro_batch_df: DataFrame, batch_id: int):
)

# TODO: find a way to run this in parallel per resource type
# - order, then partition by resource type (2 consecutive foreachBatch)
for resource_type in resource_types_in_batch:
# TODO: double-check if the sorting here is correct

start = time.perf_counter()
put_df = (
df_result.filter(
f"resource_type = '{resource_type}' and request_method = 'PUT'"
Expand Down Expand Up @@ -225,7 +259,10 @@ def upsert_to_delta(micro_batch_df: DataFrame, batch_id: int):
.whenNotMatchedInsertAll()
.execute()
)
end = time.perf_counter()
delta_operations_timer.record(end - start, {"operation": "merge"})

start = time.perf_counter()
delete_df = (
df_result.filter(
f"resource_type = '{resource_type}' and request_method = 'DELETE'"
Expand All @@ -248,18 +285,36 @@ def upsert_to_delta(micro_batch_df: DataFrame, batch_id: int):
.whenMatchedDelete()
.execute()
)
end = time.perf_counter()
delta_operations_timer.record(end - start, {"operation": "delete"})

# TODO: should vacuum all tables, not just the ones in the batch
if batch_id % settings.spark.upkeep_interval == 0:
logger.info("Optimizing and vacuuming table")
delta_table.detail().show()
delta_table.optimize().executeCompaction()
delta_table.vacuum(retentionHours=settings.vacuum_retention_hours)
optimize_and_vacuum_table(delta_table)


def optimize_and_vacuum_table(delta_table: DeltaTable):
logger.info("Optimizing and vacuuming table")
delta_table.detail().show(truncate=False)

start = time.perf_counter()
optimize_df = delta_table.optimize().executeCompaction()
end = time.perf_counter()
delta_operations_timer.record(end - start, {"operation": "optimize"})

optimize_df.show(truncate=False)

start = time.perf_counter()
delta_table.vacuum(retentionHours=settings.vacuum_retention_hours)
end = time.perf_counter()
delta_operations_timer.record(end - start, {"operation": "vacuum"})


# Write the output of a streaming aggregation query into Delta table
df.writeStream.option("checkpointLocation", settings.spark.checkpoint_dir).foreachBatch(
upsert_to_delta
).outputMode("update").queryName("fhir_bundles_to_delta_tables").start()
).outputMode("update").queryName("fhir_bundles_to_delta_tables").trigger(
processingTime=settings.spark.streaming_processing_time
).start()

spark.streams.awaitAnyTermination()
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ delta-spark==3.2.0
typed-settings==24.6.0
typed-settings[attrs]==24.6.0
loguru==0.7.3
opentelemetry-api==1.29.0
opentelemetry-sdk==1.29.0
opentelemetry-exporter-prometheus==0.50b0

0 comments on commit 4277635

Please sign in to comment.