Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Verify data in tests (row count and first row) #215

Merged
merged 3 commits into from
Nov 27, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 76 additions & 88 deletions target_postgres/tests/test_standard_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import uuid
from contextlib import redirect_stdout
from decimal import Decimal
from pathlib import Path

import jsonschema
Expand Down Expand Up @@ -116,6 +117,42 @@ def remove_metadata_columns(row: dict) -> dict:
return new_row


def verify_data(
sebastianswms marked this conversation as resolved.
Show resolved Hide resolved
target: TargetPostgres,
table_name: str,
number_of_rows: int = 1,
primary_key: str | None = None,
first_row: dict | None = None,
):
"""Checks whether the data in a table matches a provided data sample.

Args:
target: The target to obtain a database connection from.
full_table_name: The schema and table name of the table to check data for.
primary_key: The primary key of the table table.
number_of_rows: The expected number of rows that should be in the table.
first_row: A dictionary representing the full contents of the first row in the
table, as determined by lowest primary_key value.
"""
engine = create_engine(target)
full_table_name = f"{target.config['default_target_schema']}.{table_name}"
with engine.connect() as connection:
if primary_key is not None and first_row is not None:
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = remove_metadata_columns(result.first()._asdict())
assert result_dict == first_row
else:
result = connection.execute(
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}")
)
assert result.first()[0] == number_of_rows


def test_sqlalchemy_url_config(postgres_config_no_ssl):
"""Be sure that passing a sqlalchemy_url works

Expand Down Expand Up @@ -230,10 +267,11 @@ def test_special_chars_in_attributes(postgres_target):
singer_file_to_target(file_name, postgres_target)


# TODO test that data is correctly set
def test_optional_attributes(postgres_target):
file_name = "optional_attributes.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "optional": "This is optional"}
verify_data(postgres_target, "test_optional_attributes", 4, "id", row)


def test_schema_no_properties(postgres_target):
Expand All @@ -242,101 +280,49 @@ def test_schema_no_properties(postgres_target):
singer_file_to_target(file_name, postgres_target)


# TODO test that data is correct
def test_schema_updates(postgres_target):
file_name = "schema_updates.singer"
singer_file_to_target(file_name, postgres_target)
row = {
"id": 1,
"a1": Decimal("101"),
"a2": "string1",
"a3": None,
"a4": None,
"a5": None,
"a6": None,
}
verify_data(postgres_target, "test_schema_updates", 6, "id", row)


# TODO test that data is correct
def test_multiple_state_messages(postgres_target):
file_name = "multiple_state_messages.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "metric": 100}
verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row)
row = {"id": 1, "metric": 110}
verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row)


def test_relational_data(postgres_target):
engine = create_engine(postgres_target)
file_name = "user_location_data.singer"
singer_file_to_target(file_name, postgres_target)

file_name = "user_location_upsert_data.singer"
singer_file_to_target(file_name, postgres_target)

schema_name = postgres_target.config["default_target_schema"]
user = {"id": 1, "name": "Johny"}
location = {"id": 1, "name": "Philly"}
user_in_location = {
"id": 1,
"user_id": 1,
"location_id": 4,
"info": {"weather": "rainy", "mood": "sad"},
}

with engine.connect() as connection:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you want to consolidate the Tests but I think this test had a pretty solid data test and you removed a lot of the checks that were there before. We could just leave this one alone, or you could expand your verify data function to be able to work with multiple rows of data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded to work with multiple rows.

expected_test_users = [
{"id": 1, "name": "Johny"},
{"id": 2, "name": "George"},
{"id": 3, "name": "Jacob"},
{"id": 4, "name": "Josh"},
{"id": 5, "name": "Jim"},
{"id": 8, "name": "Thomas"},
{"id": 12, "name": "Paul"},
{"id": 13, "name": "Mary"},
]

full_table_name = f"{schema_name}.test_users"
result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")
)
result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()]
assert result_dict == expected_test_users

expected_test_locations = [
{"id": 1, "name": "Philly"},
{"id": 2, "name": "NY"},
{"id": 3, "name": "San Francisco"},
{"id": 6, "name": "Colorado"},
{"id": 8, "name": "Boston"},
]

full_table_name = f"{schema_name}.test_locations"
result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")
)
result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()]
assert result_dict == expected_test_locations

expected_test_user_in_location = [
{
"id": 1,
"user_id": 1,
"location_id": 4,
"info": {"weather": "rainy", "mood": "sad"},
},
{
"id": 2,
"user_id": 2,
"location_id": 3,
"info": {"weather": "sunny", "mood": "satisfied"},
},
{
"id": 3,
"user_id": 1,
"location_id": 3,
"info": {"weather": "sunny", "mood": "happy"},
},
{
"id": 6,
"user_id": 3,
"location_id": 2,
"info": {"weather": "sunny", "mood": "happy"},
},
{
"id": 14,
"user_id": 4,
"location_id": 1,
"info": {"weather": "cloudy", "mood": "ok"},
},
]

full_table_name = f"{schema_name}.test_user_in_location"
result = connection.execute(
sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")
)
result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()]
assert result_dict == expected_test_user_in_location
verify_data(postgres_target, "test_users", 8, "id", user)
verify_data(postgres_target, "test_locations", 5, "id", location)
verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location)


def test_no_primary_keys(postgres_target):
Expand All @@ -345,9 +331,7 @@ def test_no_primary_keys(postgres_target):
table_name = "test_no_pk"
full_table_name = postgres_target.config["default_target_schema"] + "." + table_name
with engine.connect() as connection, connection.begin():
result = connection.execute(
sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")
)
connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}"))
file_name = f"{table_name}.singer"
singer_file_to_target(file_name, postgres_target)

Expand All @@ -360,30 +344,28 @@ def test_no_primary_keys(postgres_target):
file_name = f"{table_name}_append.singer"
singer_file_to_target(file_name, postgres_target)

# Will populate us with 22 records, we run this twice
with engine.connect() as connection:
result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 16
verify_data(postgres_target, table_name, 16)


def test_no_type(postgres_target):
file_name = "test_no_type.singer"
singer_file_to_target(file_name, postgres_target)


# TODO test that data is correct
def test_duplicate_records(postgres_target):
file_name = "duplicate_records.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "metric": 100}
verify_data(postgres_target, "test_duplicate_records", 2, "id", row)


# TODO test that data is correct
def test_array_data(postgres_target):
file_name = "array_data.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "fruits": ["apple", "orange", "pear"]}
verify_data(postgres_target, "test_carts", 4, "id", row)


# TODO test that data is correct
def test_encoded_string_data(postgres_target):
"""
We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them.
Expand All @@ -396,6 +378,12 @@ def test_encoded_string_data(postgres_target):

file_name = "encoded_strings.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "info": "simple string 2837"}
verify_data(postgres_target, "test_strings", 11, "id", row)
row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}}
verify_data(postgres_target, "test_strings_in_objects", 11, "id", row)
row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]}
verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row)


def test_tap_appl(postgres_target):
Expand Down
Loading