Skip to content

Commit

Permalink
V1 of MDTF Works
Browse files Browse the repository at this point in the history
  • Loading branch information
DanCRichards committed May 12, 2024
1 parent dcde622 commit 05df6ca
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 116 deletions.
35 changes: 31 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,41 @@
# Metadata-Driven-Transformation-Framework (MDTF)
A Postgres Based Meta Data Driven Transformation Framework to leverage column.
A Postgres program that enables users to create transformation logic using a data driven approach.

TL;DR Insert information into tables. Run some code, it performs some transformations for you.

## Introduction
Want to use a data driven approach to creating your transformation logic? This framework is for you.

## Problems
### Problem it is trying to solve
- Have multiple inputs that are of a similar nature, but have different column names or have some level of complexity.
- Allowing for a SQL Code first approach to generating transformation logic, and allowing for the leveraging of information schema.


## Diagram
## Transformation Core Concepts

![Diagram of Meta Data Driven Framework](docs/assets/mtfdiagram.png)
### Diagram
![Diagram of Meta Data Driven Framework](docs/assets/mtfdiagram.png)

### Transformations
This entity represents a transformation that is applied to a table.
Consider this the table you ```INSERT INTO```

### Target Mapping
This represents an output column, of which there can be many inputs to. A target mapping can have a function applied to it to transform the data.
Consider this the column you ```INSERT INTO```

### Function
The definition of a function that can be applied to a target mapping.
Consider this the function you apply to a column in a ```SELECT``` statement.

### Function Input
The definitions of the inputs which can be inserted into a function.
Consider this the parameters you pass to a function in a ```SELECT``` statement.


### Join Mappings
The definitions of tables to join to when performing a transformation.
Consider this the tables you ```JOIN``` to in a ```SELECT``` statement.


## Usage
18 changes: 18 additions & 0 deletions src/functions/01_DEFINE_FUNCTION_MAPPINGS.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Defines the enum which is used on joins to refer to functions!
When you add a function you simply add it to this enum so that there is a level of typing between the function names and there reference
*/

SET SEARCH_PATH=mtf;

-- Inserting predefined functions into the functions table
TRUNCATE TABLE functions CASCADE;
INSERT INTO functions (function_name, function_definition)
VALUES
('', 'Directly assigns values without transformation'),
('safe_to_timestamp', 'Converts string to timestamp safely'),
('str_to_boolean', 'Returns a fixed boolean value'),
('return_enum', 'Returns a specified enum value');


71 changes: 0 additions & 71 deletions src/functions/function_index.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
SET SEARCH_PATH=mtf;
CREATE OR REPLACE FUNCTION safe_to_timestamp(text, text)
RETURNS TIMESTAMP AS $$
BEGIN
Expand Down
29 changes: 21 additions & 8 deletions src/mappings/01_DEFINE_MAPPINGS.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ CREATE SCHEMA IF NOT EXISTS mtf; -- meta transformation framework
SET SEARCH_PATH = mtf;

-- Drop existing tables and types if they exist
DROP TABLE IF EXISTS join_mappings;
DROP TABLE IF EXISTS join_mappings CASCADE ;
DROP TABLE IF EXISTS join_conditions_mapping CASCADE ;
DROP TABLE IF EXISTS input_mapping;
DROP TABLE IF EXISTS target_mapping;
DROP TABLE IF EXISTS transformations;
Expand All @@ -13,6 +14,16 @@ DROP TYPE IF EXISTS join_type CASCADE;
-- Recreate enum for join types
CREATE TYPE join_type AS ENUM ('INNER', 'LEFT', 'RIGHT', 'FULL JOIN');


DROP TYPE IF EXISTS function_name CASCADE; -- Drop existing enum type if it exists, cascading to dependent objects
CREATE TYPE function_name AS ENUM (
'',
'safe_to_timestamp',
'str_to_boolean',
'return_enum'
);


-- Functions table
CREATE TABLE functions
(
Expand All @@ -32,10 +43,12 @@ CREATE TABLE function_inputs
-- Transformations table using natural keys
CREATE TABLE transformations
(
transformation_key TEXT,
source_table_name TEXT,
target_table_name TEXT,
description TEXT,
PRIMARY KEY (source_table_name, target_table_name)
PRIMARY KEY (source_table_name, target_table_name),
UNIQUE (transformation_key)
);

-- Target Mapping table using natural keys
Expand Down Expand Up @@ -65,19 +78,19 @@ CREATE TABLE input_mapping
-- Join Mappings table using transformation natural keys
CREATE TABLE join_mappings
(
transformation_key TEXT,
join_mapping_key TEXT PRIMARY KEY, -- Human-readable, string-based identifier
source_table_name TEXT,
target_table_name TEXT,
join_type join_type, -- ENUM type for join methods such as 'INNER', 'LEFT', etc.
FOREIGN KEY (source_table_name, target_table_name) REFERENCES transformations(source_table_name, target_table_name) ON DELETE CASCADE
join_table_name TEXT,
join_type join_type, -- ENUM type for join methods such as 'INNER', 'LEFT', etc.
FOREIGN KEY (transformation_key) REFERENCES transformations(transformation_key) ON DELETE CASCADE
);

CREATE TABLE join_conditions_mapping
(
condition_id SERIAL PRIMARY KEY,
join_mapping_id INT NOT NULL,
join_mapping_key TEXT NOT NULL,
lhs_column TEXT NOT NULL, -- Left-hand side of the condition, typically a column name
rhs_column TEXT NOT NULL, -- Right-hand side of the condition, can be a column name or a literal value
operator TEXT NOT NULL CHECK (operator IN ('=', '!=', '<', '>', '<=', '>=')), -- Comparison operator
FOREIGN KEY (join_mapping_id) REFERENCES join_mappings(join_mapping_key) ON DELETE CASCADE
FOREIGN KEY (join_mapping_key) REFERENCES join_mappings(join_mapping_key) ON DELETE CASCADE
);
84 changes: 62 additions & 22 deletions src/procs/02_DEFINE_PROCS.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
CREATE OR REPLACE PROCEDURE process_notes_transformation()
CREATE OR REPLACE PROCEDURE perform_transformation(
input_transformation_key TEXT
)
LANGUAGE plpgsql
AS $$
DECLARE
Expand All @@ -7,44 +9,82 @@ DECLARE
source_list TEXT := '';
joins TEXT := '';
record RECORD;
input_clause TEXT;
function_inputs RECORD;
condition_record RECORD;
input_source_table_name TEXT;
input_target_table_name TEXT;
BEGIN
-- Initialize SQL for insert

/* Declare the source and target table names */
SELECT target_table_name, source_table_name INTO input_target_table_name, input_source_table_name
FROM transformations
WHERE transformation_key = input_transformation_key;

/* Initialise the SQL statement with the target table name */
SELECT 'INSERT INTO ' || target_table_name INTO sql_text
FROM transformations
WHERE source_table_name = 'landing.notes' AND target_table_name = 'staging.notes';
WHERE source_table_name = input_source_table_name AND target_table_name = input_target_table_name;

sql_text := sql_text || ' (';

-- Loop through target mappings to build target column list and source value list
FOR record IN SELECT tm.target_table_name, tm.target_column_name, tm.function_name, im.source_table_name, im.source_column_name, jm.join_type, jm.join_condition
/* Loop through target mappings to build target column list and source value list */
FOR record IN SELECT tm.target_table_name, tm.target_column_name, tm.function_name
FROM target_mapping tm
LEFT JOIN input_mapping im ON tm.target_table_name = im.target_table_name AND tm.target_column_name = im.target_column_name
LEFT JOIN join_mappings jm ON jm.source_table_name = im.source_table_name AND jm.target_table_name = tm.target_table_name
WHERE tm.target_table_name = 'staging.notes'
WHERE tm.target_table_name = input_target_table_name
LOOP
-- Add columns to the insert list
IF record.target_column_name IS NOT NULL THEN
target_list := target_list || quote_ident(record.target_column_name) || ', ';
END IF;

IF record.source_table_name IS NOT NULL AND TRIM(record.source_table_name) <> '' THEN
-- When there is a source_table_name, build using table and column
source_list := source_list || ' ' || record.function_name || '(' || record.source_table_name || '.' || quote_ident(record.source_column_name) || ')' || ' AS ' || quote_ident(record.target_column_name) || ', ';
-- Reset input_clause for each column
input_clause := '';

-- Collect all inputs for this function/target column
FOR function_inputs IN SELECT im.source_table_name, im.source_column_name
FROM input_mapping im
WHERE im.target_table_name = record.target_table_name
AND im.target_column_name = record.target_column_name
ORDER BY im.input_order
LOOP
-- Construct input_clause based on presence of a source table
IF function_inputs.source_table_name IS NOT NULL AND TRIM(function_inputs.source_table_name) <> '' THEN
input_clause := input_clause || function_inputs.source_table_name || '.' || quote_ident(function_inputs.source_column_name) || ', ';
ELSE
input_clause := input_clause || '''' || function_inputs.source_column_name || '''' || ', ';
END IF;
END LOOP;

-- Remove the trailing comma and space from input_clause
input_clause := TRIM(TRAILING ', ' FROM input_clause);

-- Build source list using function if applicable or directly use the column
IF record.function_name IS NOT NULL AND input_clause <> '' THEN
source_list := source_list || ' ' || record.function_name || '(' || input_clause || ')' || ' AS ' || quote_ident(record.target_column_name) || ', ';
ELSE
-- Handling case where there's a function, but no table name (treating source_column_name as a constant or predefined value)
source_list := source_list || ' ' || record.function_name || '(' || quote_literal(record.source_column_name) || ')' || ' AS ' || quote_ident(record.target_column_name) || ', ';
-- Directly use the input_clause as the source when no function is specified
source_list := source_list || ' ' || input_clause || ' AS ' || quote_ident(record.target_column_name) || ', ';
END IF;
END LOOP;

-- Build joins from join mappings and their conditions
FOR record IN SELECT jm.join_mapping_key, jm.join_type, jm.join_table_name
FROM join_mappings jm
WHERE jm.transformation_key = 'test_notes'
LOOP
joins := joins || ' ' || record.join_type || ' JOIN ' || record.join_table_name || ' ON ';

-- Add joins if applicable
IF record.join_type IS NOT NULL AND record.join_condition IS NOT NULL AND record.source_table_name IS NOT NULL THEN
joins := joins || ' ' || record.join_type || ' JOIN ' || record.source_table_name || ' ON ' || record.join_condition || ' ';
END IF;
-- Append all conditions for this join
FOR condition_record IN SELECT jcm.lhs_column, jcm.rhs_column, jcm.operator
FROM join_conditions_mapping jcm
WHERE jcm.join_mapping_key = record.join_mapping_key
LOOP
joins := joins || condition_record.lhs_column || ' ' || condition_record.operator || ' ' || condition_record.rhs_column || ' AND ';
END LOOP;

-- Add joins if applicable
IF record.join_type IS NOT NULL AND record.join_condition IS NOT NULL AND record.source_table_name IS NOT NULL THEN
joins := joins || ' ' || record.join_type || ' JOIN ' || record.source_table_name || ' ON ' || record.join_condition || ' ';
END IF;
-- Remove the trailing ' AND '
joins := RTRIM(joins, ' AND ');
END LOOP;

-- Remove trailing commas and finalize the SQL statement
Expand All @@ -58,4 +98,4 @@ BEGIN
-- Execute the dynamic SQL
EXECUTE sql_text;
END
$$;
$$;
37 changes: 26 additions & 11 deletions src/test_notes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,49 @@ TRUNCATE TABLE target_mapping CASCADE;
TRUNCATE TABLE input_mapping CASCADE;

-- Inserting transformations
INSERT INTO transformations (source_table_name, target_table_name, description)
INSERT INTO transformations (transformation_key, source_table_name, target_table_name, description)
VALUES
('landing.notes', 'staging.notes', 'Transforms notes from landing area to staging with enriched and formatted fields.');
('test_notes','landing.notes', 'staging.notes', 'Transforms notes from landing area to staging with enriched and formatted fields.');

-- Inserting join mappings using the natural key relationships
INSERT INTO join_mappings (source_table_name, target_table_name, join_type, join_condition)
-- Inserting join mappings with human-readable IDs
INSERT INTO join_mappings (transformation_key, join_mapping_key, join_table_name, join_type)
VALUES
('landing.notes', 'staging.notes', 'INNER', 'landing.notes.personal_id = dbo.questionnaire_response.PersonalId'),
('landing.notes', 'staging.notes', 'LEFT', 'landing.notes.userCreated = staging.users.BubbleGateuserId');
('test_notes','NotesResponseJoin', 'dbo.questionnaire_response', 'INNER'),
('test_notes','NotesUserJoin', 'staging.users', 'LEFT');

-- Inserting join conditions using the new table structure
INSERT INTO join_conditions_mapping (join_mapping_key, lhs_column, rhs_column, operator)
VALUES
('NotesResponseJoin', 'landing.notes.personal_id', 'dbo.questionnaire_response."PersonalId"', '='),
('NotesUserJoin', 'landing.notes."userCreated"', 'staging.users."BubbleGateuserId"', '=');


-- Inserting target mappings
INSERT INTO target_mapping (target_table_name, target_column_name, function_name, source_table_name)
VALUES
('staging.notes', 'QuestionnaireResponseId', 'direct_assignment', 'landing.notes'),
('staging.notes', 'Note', 'direct_assignment', 'landing.notes'),
('staging.notes', 'QuestionnaireResponseId', '', 'landing.notes'),
('staging.notes', 'Note', '', 'landing.notes'),
('staging.notes', 'CreatedTimeStamp', 'safe_to_timestamp', 'landing.notes'),
('staging.notes', 'SoftDeleted', 'str_to_boolean', 'landing.notes'),
('staging.notes', 'CreatedByUserAuth0Id', 'direct_assignment', 'landing.notes'),
('staging.notes', 'userCreated', 'direct_assignment', 'landing.notes'),
('staging.notes', 'NoteType', 'direct_assignment', 'landing.notes');
('staging.notes', 'CreatedByUserAuth0Id', '', 'landing.notes'),
('staging.notes', 'userCreated', '', 'landing.notes'),
('staging.notes', 'NoteType', '', 'landing.notes');

-- Inserting input mappings, ensuring they reference the target mapping directly
INSERT INTO input_mapping (target_table_name, target_column_name, source_table_name, source_column_name, input_order, in_group_by)
VALUES
('staging.notes', 'QuestionnaireResponseId', 'dbo.questionnaire_response', 'QuestionnaireResponseId', 1, FALSE),
('staging.notes', 'Note', 'landing.notes', 'details', 1, FALSE),
('staging.notes', 'CreatedTimeStamp', 'landing.notes', 'dateCreated', 1, FALSE),
('staging.notes', 'CreatedTimeStamp', '', 'YYYY-MM-DD', 2, FALSE),
('staging.notes', 'SoftDeleted', '', 'false', 1, TRUE), -- for constant values
('staging.notes', 'CreatedByUserAuth0Id', 'staging.users', 'UserAuth0Id', 1, FALSE),
('staging.notes', 'userCreated', 'landing.notes', 'userCreated', 1, FALSE),
('staging.notes', 'NoteType', '', 'Private', 1, TRUE); -- Assuming 'Private' is a constant for enum


call perform_transformation('test_notes')


SELECT * FROM staging.notes

0 comments on commit 05df6ca

Please sign in to comment.