Skip to content

Commit

Permalink
Add workflow to calculate source data schema diff (#528)
Browse files Browse the repository at this point in the history
* Add workflow to calculate source data schema diff

* Fix branch name

* Try cloning repo

update

update

* typo

Check all edge cases of changelog

test

test

test

* write regardless

* append

* update script to also display date

reformat

* Reformat the md file

* More formatting and testng edge cases

* update script

* revert back all schema changes

test

test

figured it out

* field addition and deletion

* lint

another take at lint

* cleanup

* Modularize python script

* update script with black

* lint

create

make directory

make directory

cleanup

* test changelog build

* push to branch

push to branch

push to branch

push to branch

* push to branch

push to branch

push to branch

push to branch

push to branch

push to branch

push to branch

* Update changelog for Source data

* revert schema changes

* revert schema changes

* Update .github/workflows/update_dbt_marts_schema_changelog.yml

* Add docstring

* Minor refactor

* Update scripts/update_source_data_schema_changelog.py

Co-authored-by: chowbao <[email protected]>

* Update scripts/update_source_data_schema_changelog.py

Co-authored-by: chowbao <[email protected]>

* Update scripts/update_source_data_schema_changelog.py

Co-authored-by: chowbao <[email protected]>

* Update scripts/update_source_data_schema_changelog.py

Co-authored-by: chowbao <[email protected]>

* clear contents of changelog

---------

Co-authored-by: GitHub Action <[email protected]>
Co-authored-by: chowbao <[email protected]>
  • Loading branch information
3 people authored Nov 6, 2024
1 parent 56f581c commit d6ca427
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
48 changes: 48 additions & 0 deletions .github/workflows/update_source_data_schema_changelog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Update changelog for Source Data

on:
push:

permissions:
contents: write

concurrency:
group: ${{ github.workflow }}-${{ github.ref_protected == 'true' && github.sha || github.ref }}-{{ github.event_name }}
cancel-in-progress: true

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: 3.12

- name: Run Bash Script
run: |
set -x
cd $GITHUB_WORKSPACE
OLD_SCHEMAS_DIR=original_schemas
git clone --branch master https://github.com/stellar/stellar-etl-airflow.git repo_master_copy
mkdir OLD_SCHEMAS_DIR
cp -r repo_master_copy/schemas/ $OLD_SCHEMAS_DIR/
export OLD_SCHEMAS_DIR
python3 scripts/update_source_data_schema_changelog.py
rm -rf $OLD_SCHEMAS_DIR
rm -rf repo_master_copy
- name: Commit and Push Changes
run: |
git config --local user.email "[email protected]"
git config --local user.name "GitHub Action"
git add changelog/source_data.md
if git commit -m "Update changelog for Source data"; then
echo "Changes committed."
git push
else
echo "No changes to commit."
fi
Empty file added changelog/source_data.md
Empty file.
197 changes: 197 additions & 0 deletions scripts/update_source_data_schema_changelog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import json
import os
import sys
from datetime import datetime

OLD_SCHEMAS_DIR = os.environ.get("OLD_SCHEMAS_DIR")
NEW_SCHEMAS_DIR = "schemas"
CHANGELOG_FILEPATH = "changelog/source_data.md"


def read_json_file(filepath: str) -> {}:
with open(filepath, "r") as rfp:
try:
return json.load(rfp)
except:
sys.exit(f"Not a valid JSON at filepath {filepath}.")


def read_file(filepath: str) -> str:
with open(filepath, "r") as rfp:
return rfp.read()


def write_file(filepath: str, content: str, mode="a") -> None:
with open(filepath, mode) as wfp:
wfp.write(content)


def sort_schema_changes(schema_changes: {}) -> {}:
sorted_data = {}

for table_name, op_types in sorted(schema_changes.items()):
sorted_data[table_name] = {
op_type: sorted(columns) for op_type, columns in sorted(op_types.items())
}
return sorted_data


def list_files_in_dir(directory: str) -> []:
if not os.path.exists(directory):
sys.exit(f"Directory {directory} does not exist.")
return os.listdir(directory)


def compare_lists(old_list=[], new_list=[]):
old_set = set(old_list)
new_set = set(new_list)

common = old_set & new_set
added = new_set - old_set
deleted = old_set - new_set

return list(common), list(added), list(deleted)


def get_mapped_schema_json(directory: str, schema_name: str) -> {}:
"""
Returns schema object indexed by field name.
Example:
{
"field_name_1": {
"name": field_name_1,
"type": STRING
},
"field_name_2": {
"name": field_name_2,
"type": STRING
},
}
"""
schema_json = read_json_file(os.path.join(directory, schema_name))
schema_json_by_col_name = {column["name"]: column for column in schema_json}
return schema_json_by_col_name


def compare_schemas(schemas=[]) -> {}:
schema_changes = {}

for schema in schemas:
old_schema_json_by_col_name = get_mapped_schema_json(
directory=OLD_SCHEMAS_DIR, schema_name=schema
)
new_schema_json_by_col_name = get_mapped_schema_json(
directory=NEW_SCHEMAS_DIR, schema_name=schema
)

common, added, deleted = compare_lists(
old_list=old_schema_json_by_col_name.keys(),
new_list=new_schema_json_by_col_name.keys(),
)

type_changed = [
(
col,
old_schema_json_by_col_name[col]["type"],
new_schema_json_by_col_name[col]["type"],
)
for col in common
if old_schema_json_by_col_name[col]["type"]
!= new_schema_json_by_col_name[col]["type"]
]

if added or deleted or type_changed:
schema_changes[schema] = {
"column_added": added,
"column_removed": deleted,
"type_changed": type_changed,
}

return schema_changes


def get_print_label_string(label: str) -> str:
return f"\n## {label}:\n" if label else label


def get_print_schemas_string(label="", schemas=[]) -> str:
print_string = ""
if not len(schemas):
return print_string

print_string += get_print_label_string(label)
for schema in schemas:
print_string += f"- {schema.replace('_schema.json', '')}"
return print_string


def get_print_schema_changes_string(label="", schema_changes={}) -> str:
print_string = ""
if not schema_changes:
return print_string

print_string += get_print_label_string(label)

markdown_table = "| Table Name | Operation | Columns |\n"
markdown_table += "|---------------------------------|---------------|----------------------------------------------------|\n"

for schema_name, operations in schema_changes.items():
for operation, columns in operations.items():
if len(columns):
if operation in ["column_added", "column_removed"]:
columns_str = ", ".join(columns)
if operation in ["type_changed"]:
columns_str = ", ".join(
[
f"{column[0]} ({column[1]} -> {column[2]})"
for column in columns
]
)
markdown_table += f"| {get_print_schemas_string(schemas=[schema_name]):<31} | {operation:<13} | {columns_str:<48} |\n"
print_string += markdown_table
return print_string


def generate_changelog(schemas_added=[], schemas_deleted=[], schemas_changes={}) -> str:
new_changelog = ""
if schemas_added or schemas_deleted or schemas_changes:
current_date = datetime.now().strftime("%Y-%m-%d")
new_changelog += get_print_label_string(current_date)

new_changelog += get_print_schemas_string(
label="Tables Added", schemas=schemas_added
)
new_changelog += get_print_schemas_string(
label="Tables Deleted", schemas=schemas_deleted
)

sorted_schema_changes = sort_schema_changes(schemas_changes)
new_changelog += get_print_schema_changes_string(
label="Schema Changes", schema_changes=sorted_schema_changes
)
return new_changelog


def main():
existing_changelog = read_file(filepath=CHANGELOG_FILEPATH)
old_schema_filepaths = list_files_in_dir(directory=OLD_SCHEMAS_DIR)
new_schema_filepaths = list_files_in_dir(directory=NEW_SCHEMAS_DIR)

common, added, deleted = compare_lists(
old_list=old_schema_filepaths, new_list=new_schema_filepaths
)
schema_changes = compare_schemas(common)
new_changelog = generate_changelog(
schemas_added=added, schemas_deleted=deleted, schemas_changes=schema_changes
)

if len(new_changelog):
# TODO: Append to same date if multiple changelog commited in same day
write_file(
filepath=CHANGELOG_FILEPATH, mode="w", content=new_changelog + "\n\n"
)
write_file(filepath=CHANGELOG_FILEPATH, mode="a", content=existing_changelog)


if __name__ == "__main__":
main()

0 comments on commit d6ca427

Please sign in to comment.