Skip to content

Commit

Permalink
Merge pull request #10 from databrickslabs/amir-fix-tests
Browse files Browse the repository at this point in the history
simplifying tests based on pystest #4
  • Loading branch information
kermany authored Sep 7, 2022
2 parents 15515e7 + 2b1e3c0 commit 4f3443d
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 98 deletions.
39 changes: 35 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ _DataModels_ hold the state of an interoperable _DataModel_
such as FHIR bundles, or OMOP CDM. The _Transformer_ class contains
pre-defined transformations from one data model to another one.
> [see Transformers](#Transformers)
[![](https://mermaid.ink/img/pako:eNptkL0OAjEIx1_lwqRRX6Bx0tNJo4lrF7yi16QfprSD0btnFz8HlYHAnx8EuEATDYGCxiFzbfGY0OtQidWYcS1FV02vk0m1pcQx1MjtPmIyf5mNj6e5-d-_bG2alWAc8VddVSMu3mM6D4YPdSk70C_kLOd7vkcmFnQlef8Reh2eLU8PY_CUPFojx13umobckswFJaGhAxaXNejQCVpOBjMtjM0xgTqgYxoDlhx359CAyqnQG3r96EV1N2GYavQ)](https://mermaid-js.github.io/mermaid-live-editor/edit/#pako:eNptkL0OAjEIx1_lwqRRX6Bx0tNJo4lrF7yi16QfprSD0btnFz8HlYHAnx8EuEATDYGCxiFzbfGY0OtQidWYcS1FV02vk0m1pcQx1MjtPmIyf5mNj6e5-d-_bG2alWAc8VddVSMu3mM6D4YPdSk70C_kLOd7vkcmFnQlef8Reh2eLU8PY_CUPFojx13umobckswFJaGhAxaXNejQCVpOBjMtjM0xgTqgYxoDlhx359CAyqnQG3r96EV1N2GYavQ)
```mermaid
classDiagram
DataModel <|-- PersonDashboard
DataModel <|-- OmopCdm
DataModel <|-- FhirBundles
DataModel: +summary() DataFrame
DataModel: +listDatabases() List~Databases~
```

## Transformers
One of the key challenges for interoperability of data models is a
Expand All @@ -56,15 +64,38 @@ cross compatability with many other data models. Intermediate
data models can mitigate this issue.

### The "Many to Many" Problem
[![](https://mermaid.ink/img/eyJjb2RlIjoiZ3JhcGggTFJcbiAgICBBW0FdIC0tPnx0cmFuc2Zvcm18IEIoQilcbiAgICBBW0FdIC0tPnx0cmFuc2Zvcm18IEMoQylcbiAgICBCW0JdIC0tPnx0cmFuc2Zvcm18IEEoQSlcbiAgICBCW0JdIC0tPnx0cmFuc2Zvcm18IEMoQylcbiAgICBDW0NdIC0tPnx0cmFuc2Zvcm18IEEoQSlcbiAgICBDW0NdIC0tPnx0cmFuc2Zvcm18IEIoQilcbiIsIm1lcm1haWQiOnsidGhlbWUiOiJkZWZhdWx0In0sInVwZGF0ZUVkaXRvciI6ZmFsc2UsImF1dG9TeW5jIjp0cnVlLCJ1cGRhdGVEaWFncmFtIjpmYWxzZX0)](https://mermaid.live/edit#eyJjb2RlIjoiZ3JhcGggTFJcbiAgICBBW0FdIC0tPnx0cmFuc2Zvcm18IEIoQilcbiAgICBBW0FdIC0tPnx0cmFuc2Zvcm18IEMoQylcbiAgICBCW0JdIC0tPnx0cmFuc2Zvcm18IEEoQSlcbiAgICBCW0JdIC0tPnx0cmFuc2Zvcm18IEMoQylcbiAgICBDW0NdIC0tPnx0cmFuc2Zvcm18IEEoQSlcbiAgICBDW0NdIC0tPnx0cmFuc2Zvcm18IEIoQilcbiIsIm1lcm1haWQiOiJ7XG4gIFwidGhlbWVcIjogXCJkZWZhdWx0XCJcbn0iLCJ1cGRhdGVFZGl0b3IiOmZhbHNlLCJhdXRvU3luYyI6dHJ1ZSwidXBkYXRlRGlhZ3JhbSI6ZmFsc2V9)

```mermaid
graph LR
A[A] -->|transform| B(B)
A[A] -->|transform| C(C)
B[B] -->|transform| A(A)
B[B] -->|transform| C(C)
C[C] -->|transform| A(A)
C[C] -->|transform| B(B)
```

Pipelining existing transforms can significantly simplify
the problem of mapping a variety of data models.
[![](https://mermaid.ink/img/eyJjb2RlIjoiZ3JhcGggTFJcbiAgICBYW0FdIC0tLXx0cmFuc2Zvcm18IEkoSW50ZXJtZWRpYXRlIERhdGFNb2RlbCBYKVxuICAgIFlbQl0gLS0tfHRyYW5zZm9ybXwgSShJbnRlcm1lZGlhdGUgRGF0YU1vZGVsIFgpXG4gICAgWltDXSAtLS18dHJhbnNmb3JtfCBJKEludGVybWVkaWF0ZSBEYXRhTW9kZWwgWClcbiIsIm1lcm1haWQiOnsidGhlbWUiOiJkZWZhdWx0In0sInVwZGF0ZUVkaXRvciI6ZmFsc2UsImF1dG9TeW5jIjp0cnVlLCJ1cGRhdGVEaWFncmFtIjpmYWxzZX0)](https://mermaid.live/edit#eyJjb2RlIjoiZ3JhcGggTFJcbiAgICBYW0FdIC0tLXx0cmFuc2Zvcm18IEkoSW50ZXJtZWRpYXRlIERhdGFNb2RlbCBYKVxuICAgIFlbQl0gLS0tfHRyYW5zZm9ybXwgSShJbnRlcm1lZGlhdGUgRGF0YU1vZGVsIFgpXG4gICAgWltDXSAtLS18dHJhbnNmb3JtfCBJKEludGVybWVkaWF0ZSBEYXRhTW9kZWwgWClcbiIsIm1lcm1haWQiOiJ7XG4gIFwidGhlbWVcIjogXCJkZWZhdWx0XCJcbn0iLCJ1cGRhdGVFZGl0b3IiOmZhbHNlLCJhdXRvU3luYyI6dHJ1ZSwidXBkYXRlRGlhZ3JhbSI6ZmFsc2V9)

```mermaid
graph LR
X[A] ---|transform| I(Intermediate DataModel X)
Y[B] ---|transform| I(Intermediate DataModel X)
Z[C] ---|transform| I(Intermediate DataModel X)
```

### Making Pipelines with Simple Composition
the _Transformer_ class contains pre-built pipelines for conversion of different datamodel instances.
Using methods in _Transformers_ we can transform one datamodel to another.
This pattern also allows simple combination of transformations.

[![](https://mermaid.ink/img/pako:eNp1kcFqwzAMhl9F-NTQ9AXMTlvW29igOQaKGitNwLGCrGyMru8-hzS0dJ1PsvX5Qz86mZodGWtqjzEWHR4F-ypAOqVgiA1LTwJPP5sNbNtOnsfgPMWSX9x_WOqU_EESORQY2wOjuD-ohbVndHsKKh3FvWtWu1SFIwyobQYFKm7TKPToJ-hyXU3cW0rgIfIoNeVwfVGUI2k2G-6Ht-ur5KYHjXCfw3vPQ4JAOQf-JPmSTgkOzD6bZLPyUdBb7SKZlXdgUmcmNylQj51LCzhN0spoSym0sal01ODotTJVOCd0HBwqvbpOWYxt0EfKDY7Ku-9QG6sy0gJd9nihzr-_uKmr)](https://mermaid.live/edit#pako:eNp1kcFqwzAMhl9F-NTQ9AXMTlvW29igOQaKGitNwLGCrGyMru8-hzS0dJ1PsvX5Qz86mZodGWtqjzEWHR4F-ypAOqVgiA1LTwJPP5sNbNtOnsfgPMWSX9x_WOqU_EESORQY2wOjuD-ohbVndHsKKh3FvWtWu1SFIwyobQYFKm7TKPToJ-hyXU3cW0rgIfIoNeVwfVGUI2k2G-6Ht-ur5KYHjXCfw3vPQ4JAOQf-JPmSTgkOzD6bZLPyUdBb7SKZlXdgUmcmNylQj51LCzhN0spoSym0sal01ODotTJVOCd0HBwqvbpOWYxt0EfKDY7Ku-9QG6sy0gJd9nihzr-_uKmr)
```mermaid
classDiagram
Transformer <|-- FhirBundlesToCdm
Transformer <|-- CdmToPersonDashboard
Transformer: +load_entries_df(String path) DataFrame
Transformer: + transform(DataModel source, DataModel target)
FhirBundlesToCdm:+transform(FhirBundles from, OmopCdm to, overwrite bool)
CdmToPersonDashboard:+transform(OmopCdm from, PersonDashboard to)
```
1 change: 0 additions & 1 deletion dbignite/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def loadEntries(self) -> DataFrame:
def transform(self) -> DataModel:
...


class FhirBundlesToCdm(Transformer):

def __init__(self, spark):
Expand Down
8 changes: 8 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[pytest]
filterwarnings =
ignore::DeprecationWarning
spark_options =
spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog: org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.jars.packages: io.delta:delta-core_2.12:2.1.0
spark.sql.catalogImplementation: in-memory
8 changes: 5 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pyarrow==8.0.0
pyspark==3.2.1

pyspark==3.3.0
delta-spark==2.1.0
pytest-spark==0.6.0
spark-testing-base==0.10.0
chispa==0.9.2
139 changes: 49 additions & 90 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,84 @@
from pyspark.sql import SparkSession
import os
import re
import pytest

from chispa import *
from chispa.schema_comparer import *

from dbignite.data_model import *
from dbignite.utils import *
from dbignite.schemas import *

REPO = os.environ.get("REPO", "dbignite")
BRANCH = re.sub(r"\W+", "", os.environ["BRANCH"])
BRANCH = re.sub(r"\W+", "", os.environ.get("BRANCH", 'local_test'))

TEST_BUNDLE_PATH = "./sampledata/"
TEST_DATABASE = f"test_{REPO}_{BRANCH}"


##TODO Add content-based data tests

class SparkTest():
##
## Fixtures
##
def setup_class(self) -> None:
self.spark = (SparkSession.builder.appName("myapp")
.config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
.config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
.master("local")
.getOrCreate())
self.spark.conf.set("spark.sql.shuffle.partitions", 1)

def teardown_class(self) -> None:
self.spark.sql(f'DROP DATABASE IF EXISTS {TEST_DATABASE} CASCADE')

def assertSchemasEqual(self, schemaA: StructType, schemaB: StructType) -> None:
"""
Test that the two given schemas are equivalent (column ordering ignored)
"""
# both schemas must have the same length
assert len(schemaA.fields) == len(schemaB.fields)
# schemaA must equal schemaB
assert schemaA.simpleString() == schemaB.simpleString()

def assertHasSchema(self, df: DataFrame, expectedSchema: StructType) -> None:
"""
Test that the given Dataframe conforms to the expected schema
"""
assert df.schema == expectedSchema

def assertDataFramesEqual(self, dfA: DataFrame, dfB: DataFrame) -> None:
"""
Test that the two given Dataframes are equivalent.
That is, they have equivalent schemas, and both contain the same values
"""
# must have the same schemas
assert dfA.schema == dfB.schema
# enforce a common column ordering
colOrder = sorted(dfA.columns)
sortedA = dfA.select(colOrder)
sortedB = dfB.select(colOrder)
# must have identical data
# that is all rows in A must be in B, and vice-versa

assert sortedA.subtract(sortedB).count() == 0
assert sortedB.subtract(sortedA).count() == 0


class TestUtils(SparkTest):

def test_entries_to_person(self) -> None:
entries_df = FhirBundlesToCdm(self.spark).loadEntries(TEST_BUNDLE_PATH)
person_df = entries_to_person(entries_df)
@pytest.fixture
def get_entries_df(spark_session):
entries_df = FhirBundlesToCdm(spark_session).loadEntries(TEST_BUNDLE_PATH)
return(entries_df)

@pytest.fixture
def fhir_model():
fhir_model=FhirBundles(TEST_BUNDLE_PATH)
return(fhir_model)

@pytest.fixture
def cdm_model():
cdm_model=OmopCdm(TEST_DATABASE)
return(cdm_model)

class TestUtils:

def test_entries_to_person(self,get_entries_df) -> None:
person_df = entries_to_person(get_entries_df)
assert person_df.count() == 3
self.assertSchemasEqual(person_df.schema, PERSON_SCHEMA)
assert_schema_equality(person_df.schema, PERSON_SCHEMA, ignore_nullable=True)


def test_entries_to_condition(self) -> None:
entries_df = FhirBundlesToCdm(self.spark).loadEntries(TEST_BUNDLE_PATH)
condition_df = entries_to_condition(entries_df)
def test_entries_to_condition(self, get_entries_df) -> None:
condition_df = entries_to_condition(get_entries_df)
assert condition_df.count() == 103
self.assertSchemasEqual(condition_df.schema, CONDITION_SCHEMA)
assert_schema_equality(condition_df.schema, CONDITION_SCHEMA,ignore_nullable=True)

def test_entries_to_procedure_occurrence(self) -> None:
entries_df = FhirBundlesToCdm(self.spark).loadEntries(TEST_BUNDLE_PATH)
procedure_occurrence_df = entries_to_procedure_occurrence(entries_df)
def test_entries_to_procedure_occurrence(self, get_entries_df) -> None:
procedure_occurrence_df = entries_to_procedure_occurrence(get_entries_df)
assert procedure_occurrence_df.count() == 119
self.assertSchemasEqual(procedure_occurrence_df.schema, PROCEDURE_OCCURRENCE_SCHEMA)
assert_schema_equality(procedure_occurrence_df.schema, PROCEDURE_OCCURRENCE_SCHEMA, ignore_nullable=True)

def test_entries_to_encounter(self) -> None:
entries_df = FhirBundlesToCdm(self.spark).loadEntries(TEST_BUNDLE_PATH)
encounter_df = entries_to_encounter(entries_df)
def test_entries_to_encounter(self, get_entries_df) -> None:
encounter_df = entries_to_encounter(get_entries_df)
assert encounter_df.count() == 128
self.assertSchemasEqual(encounter_df.schema, ENCOUNTER_SCHEMA)
assert_schema_equality(encounter_df.schema, ENCOUNTER_SCHEMA,ignore_nullable=True)


class TestTransformers(SparkTest):
class TestTransformers:

def test_loadEntries(self) -> None:
entries_df = FhirBundlesToCdm(self.spark).loadEntries(TEST_BUNDLE_PATH)
assert entries_df.count() == 1872
self.assertSchemasEqual(entries_df.schema, JSON_ENTRY_SCHEMA)
def test_loadEntries(self, get_entries_df) -> None:
assert get_entries_df.count() == 1872
assert_schema_equality(get_entries_df.schema, JSON_ENTRY_SCHEMA,ignore_nullable=True)

def test_fhir_bundles_to_omop_cdm(self) -> None:
fhir_model=FhirBundles(TEST_BUNDLE_PATH)
cdm_model=OmopCdm(TEST_DATABASE)
FhirBundlesToCdm(self.spark).transform(fhir_model, cdm_model, True)
tables = [t.tableName for t in self.spark.sql(f"SHOW TABLES FROM {TEST_DATABASE}").collect()]
def test_fhir_bundles_to_omop_cdm(self, spark_session,fhir_model,cdm_model) -> None:
FhirBundlesToCdm(spark_session).transform(fhir_model, cdm_model, True)
tables = [t.tableName for t in spark_session.sql(f"SHOW TABLES FROM {TEST_DATABASE}").collect()]

assert TEST_DATABASE in cdm_model.listDatabases()
assert PERSON_TABLE in tables
assert CONDITION_TABLE in tables
assert PROCEDURE_OCCURRENCE_TABLE in tables
assert ENCOUNTER_TABLE in tables

assert self.spark.table(f"{TEST_DATABASE}.person").count() == 3
assert spark_session.table(f"{TEST_DATABASE}.person").count() == 3

def test_omop_cdm_to_person_dashboard(self) -> None:
transformer = CdmToPersonDashboard(self.spark)
fhir_model=FhirBundles(TEST_BUNDLE_PATH)
cdm_model=OmopCdm(TEST_DATABASE)
def test_omop_cdm_to_person_dashboard(self, spark_session, fhir_model, cdm_model) -> None:
transformer = CdmToPersonDashboard(spark_session)
person_dash_model=PersonDashboard()

FhirBundlesToCdm(self.spark).transform(fhir_model, cdm_model, True)
CdmToPersonDashboard(self.spark).transform(cdm_model,person_dash_model)
FhirBundlesToCdm(spark_session).transform(fhir_model, cdm_model, True)
CdmToPersonDashboard(spark_session).transform(cdm_model,person_dash_model)
person_dashboard_df=person_dash_model.summary()
self.assertSchemasEqual(CONDITION_SUMMARY_SCHEMA, person_dashboard_df.select('conditions').schema)
assert_schema_equality(CONDITION_SUMMARY_SCHEMA, person_dashboard_df.select('conditions').schema, ignore_nullable=True)

1 comment on commit 4f3443d

@Sinchana-KJ
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kermany how this is updated into 4 tables PERSON_TABLE, CONDITION_TABLE, PROCEDURE_OCCURRENCE_TABLE and ENCOUNTER_TABLE? The FHIR json file has more number of resource types. Does this tables remains for all the resourcetypes in a bundle file?

Please sign in to comment.