From 8c9a893b103a0ba441eb5787bcd4020ec2d07149 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 16:03:56 +0530 Subject: [PATCH 01/11] Refactor Upstream structure to use pointer for Columns; update lineage processing to handle upstream columns correctly and initialize empty upstreams in assets --- pkg/pipeline/comment.go | 2 +- pkg/pipeline/lineage.go | 14 +++++++------- pkg/pipeline/pipeline.go | 8 ++++---- pkg/sqlparser/parser.go | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/pipeline/comment.go b/pkg/pipeline/comment.go index bd88bcbba..808efcc29 100644 --- a/pkg/pipeline/comment.go +++ b/pkg/pipeline/comment.go @@ -211,7 +211,7 @@ func commentRowsToTask(commentRows []string) (*Asset, error) { case "depends": values := strings.Split(value, ",") for _, v := range values { - task.Upstreams = append(task.Upstreams, Upstream{Type: "asset", Value: strings.TrimSpace(v), Columns: make([]DependsColumn, 0)}) + task.Upstreams = append(task.Upstreams, Upstream{Type: "asset", Value: strings.TrimSpace(v), Columns: &[]DependsColumn{}}) } continue diff --git a/pkg/pipeline/lineage.go b/pkg/pipeline/lineage.go index c62814d5f..bb1a43692 100644 --- a/pkg/pipeline/lineage.go +++ b/pkg/pipeline/lineage.go @@ -98,25 +98,25 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset return errors.New("asset cannot be nil") } - upstreams := []Upstream{} + upstreams := make([]Upstream, 0) for _, up := range asset.Upstreams { upstream := up lineage.NonSelectedColumns = append(lineage.NonSelectedColumns, lineage.Columns...) dict := map[string]bool{} for _, lineageCol := range lineage.NonSelectedColumns { - for _, lineageUpstream := range lineageCol.Upstream { + for _, lineageUpstream := range *lineageCol.Upstream { key := fmt.Sprintf("%s-%s", strings.ToLower(lineageUpstream.Table), strings.ToLower(lineageCol.Name)) if _, ok := dict[key]; !ok { if strings.EqualFold(lineageUpstream.Table, up.Value) { exists := false - for _, col := range upstream.Columns { + for _, col := range *upstream.Columns { if strings.EqualFold(col.Name, lineageCol.Name) { exists = true break } } if !exists { - upstream.Columns = append(upstream.Columns, DependsColumn{ + *upstream.Columns = append(*upstream.Columns, DependsColumn{ Name: lineageCol.Name, }) } @@ -131,7 +131,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset for _, lineageCol := range lineage.Columns { if lineageCol.Name == "*" { - for _, upstream := range lineageCol.Upstream { + for _, upstream := range *lineageCol.Upstream { upstreamAsset := foundPipeline.GetAssetByName(upstream.Table) if upstreamAsset == nil { continue @@ -150,7 +150,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset continue } - if len(lineageCol.Upstream) == 0 { + if len(*lineageCol.Upstream) == 0 { if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{ Name: lineageCol.Name, Type: lineageCol.Type, @@ -162,7 +162,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset continue } - for _, upstream := range lineageCol.Upstream { + for _, upstream := range *lineageCol.Upstream { if upstream.Column == "*" { continue } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 6bb889aea..3d8ffe385 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -505,10 +505,10 @@ type DependsColumn struct { } type Upstream struct { - Type string `json:"type" yaml:"type" mapstructure:"type"` - Value string `json:"value" yaml:"value" mapstructure:"value"` - Metadata EmptyStringMap `json:"metadata,omitempty" yaml:"metadata,omitempty" mapstructure:"metadata"` - Columns []DependsColumn `json:"columns,omitempty" yaml:"columns,omitempty" mapstructure:"columns"` + Type string `json:"type" yaml:"type" mapstructure:"type"` + Value string `json:"value" yaml:"value" mapstructure:"value"` + Metadata EmptyStringMap `json:"metadata,omitempty" yaml:"metadata,omitempty" mapstructure:"metadata"` + Columns *[]DependsColumn `json:"columns,omitempty" yaml:"columns,omitempty" mapstructure:"columns"` } func (u Upstream) MarshalYAML() (interface{}, error) { diff --git a/pkg/sqlparser/parser.go b/pkg/sqlparser/parser.go index f78dbeb56..6531693e3 100644 --- a/pkg/sqlparser/parser.go +++ b/pkg/sqlparser/parser.go @@ -121,7 +121,7 @@ type UpstreamColumn struct { type ColumnLineage struct { Name string `json:"name"` - Upstream []UpstreamColumn `json:"upstream"` + Upstream []UpstreamColumn `json:"upstream,omitempty"` Type string `json:"type"` } type Lineage struct { From 06cef6cb7f8b42acd05677bf4630005553a3da90 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 16:23:12 +0530 Subject: [PATCH 02/11] revert --- pkg/pipeline/comment.go | 2 +- pkg/pipeline/lineage.go | 12 ++++++------ pkg/pipeline/pipeline.go | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/pipeline/comment.go b/pkg/pipeline/comment.go index 808efcc29..da07efbf2 100644 --- a/pkg/pipeline/comment.go +++ b/pkg/pipeline/comment.go @@ -211,7 +211,7 @@ func commentRowsToTask(commentRows []string) (*Asset, error) { case "depends": values := strings.Split(value, ",") for _, v := range values { - task.Upstreams = append(task.Upstreams, Upstream{Type: "asset", Value: strings.TrimSpace(v), Columns: &[]DependsColumn{}}) + task.Upstreams = append(task.Upstreams, Upstream{Type: "asset", Value: strings.TrimSpace(v), Columns: []DependsColumn{}}) } continue diff --git a/pkg/pipeline/lineage.go b/pkg/pipeline/lineage.go index bb1a43692..8d7627d6c 100644 --- a/pkg/pipeline/lineage.go +++ b/pkg/pipeline/lineage.go @@ -104,19 +104,19 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset lineage.NonSelectedColumns = append(lineage.NonSelectedColumns, lineage.Columns...) dict := map[string]bool{} for _, lineageCol := range lineage.NonSelectedColumns { - for _, lineageUpstream := range *lineageCol.Upstream { + for _, lineageUpstream := range lineageCol.Upstream { key := fmt.Sprintf("%s-%s", strings.ToLower(lineageUpstream.Table), strings.ToLower(lineageCol.Name)) if _, ok := dict[key]; !ok { if strings.EqualFold(lineageUpstream.Table, up.Value) { exists := false - for _, col := range *upstream.Columns { + for _, col := range upstream.Columns { if strings.EqualFold(col.Name, lineageCol.Name) { exists = true break } } if !exists { - *upstream.Columns = append(*upstream.Columns, DependsColumn{ + upstream.Columns = append(upstream.Columns, DependsColumn{ Name: lineageCol.Name, }) } @@ -131,7 +131,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset for _, lineageCol := range lineage.Columns { if lineageCol.Name == "*" { - for _, upstream := range *lineageCol.Upstream { + for _, upstream := range lineageCol.Upstream { upstreamAsset := foundPipeline.GetAssetByName(upstream.Table) if upstreamAsset == nil { continue @@ -150,7 +150,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset continue } - if len(*lineageCol.Upstream) == 0 { + if len(lineageCol.Upstream) == 0 { if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{ Name: lineageCol.Name, Type: lineageCol.Type, @@ -162,7 +162,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset continue } - for _, upstream := range *lineageCol.Upstream { + for _, upstream := range lineageCol.Upstream { if upstream.Column == "*" { continue } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 3d8ffe385..7ba38ef8b 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -429,7 +429,7 @@ type Column struct { UpdateOnMerge bool `json:"update_on_merge" yaml:"update_on_merge,omitempty" mapstructure:"update_on_merge"` Extends string `json:"-" yaml:"extends,omitempty" mapstructure:"extends"` Checks []ColumnCheck `json:"checks" yaml:"checks,omitempty" mapstructure:"checks"` - Upstreams []*UpstreamColumn `json:"upstreams,omitempty" yaml:"-" mapstructure:"-"` + Upstreams []*UpstreamColumn `json:"upstreams" yaml:"-" mapstructure:"-"` } func (c *Column) HasCheck(check string) bool { @@ -505,10 +505,10 @@ type DependsColumn struct { } type Upstream struct { - Type string `json:"type" yaml:"type" mapstructure:"type"` - Value string `json:"value" yaml:"value" mapstructure:"value"` - Metadata EmptyStringMap `json:"metadata,omitempty" yaml:"metadata,omitempty" mapstructure:"metadata"` - Columns *[]DependsColumn `json:"columns,omitempty" yaml:"columns,omitempty" mapstructure:"columns"` + Type string `json:"type" yaml:"type" mapstructure:"type"` + Value string `json:"value" yaml:"value" mapstructure:"value"` + Metadata EmptyStringMap `json:"metadata,omitempty" yaml:"metadata,omitempty" mapstructure:"metadata"` + Columns []DependsColumn `json:"columns" yaml:"columns" mapstructure:"columns"` } func (u Upstream) MarshalYAML() (interface{}, error) { From 74f2d151a3d9cc40e9da231b4aeda56cd24ed60f Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 16:23:58 +0530 Subject: [PATCH 03/11] Update ColumnLineage struct to require upstream columns in JSON serialization --- pkg/sqlparser/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sqlparser/parser.go b/pkg/sqlparser/parser.go index 6531693e3..f78dbeb56 100644 --- a/pkg/sqlparser/parser.go +++ b/pkg/sqlparser/parser.go @@ -121,7 +121,7 @@ type UpstreamColumn struct { type ColumnLineage struct { Name string `json:"name"` - Upstream []UpstreamColumn `json:"upstream,omitempty"` + Upstream []UpstreamColumn `json:"upstream"` Type string `json:"type"` } type Lineage struct { From 10211d122d7be45140676eb5814abfb22be3b54e Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 16:24:53 +0530 Subject: [PATCH 04/11] revert --- pkg/pipeline/comment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/comment.go b/pkg/pipeline/comment.go index da07efbf2..bd88bcbba 100644 --- a/pkg/pipeline/comment.go +++ b/pkg/pipeline/comment.go @@ -211,7 +211,7 @@ func commentRowsToTask(commentRows []string) (*Asset, error) { case "depends": values := strings.Split(value, ",") for _, v := range values { - task.Upstreams = append(task.Upstreams, Upstream{Type: "asset", Value: strings.TrimSpace(v), Columns: []DependsColumn{}}) + task.Upstreams = append(task.Upstreams, Upstream{Type: "asset", Value: strings.TrimSpace(v), Columns: make([]DependsColumn, 0)}) } continue From 47d6c3af1057e2436b1cebf9adcd718ede6ff352 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 17:10:02 +0530 Subject: [PATCH 05/11] Enhance asset expectations by adding 'columns' field to upstreams and updating file paths to absolute locations. This change ensures better clarity in asset dependencies and improves the handling of upstream columns across multiple expectation files. --- .../happy-path/expectations/asset.py.json | 17 +++++--- .../expectations/chess_games.asset.yml.json | 40 ++++++++++--------- .../chess_profiles.asset.yml.json | 14 ++++--- .../happy-path/expectations/pipeline.yml.json | 16 +++++--- .../expectations/player_summary.sql.json | 23 +++++++---- .../lineage/expectations/lineage-asset.json | 15 ++++--- .../lineage/expectations/lineage.json | 33 ++++++++------- pkg/pipeline/pipeline.go | 2 +- 8 files changed, 97 insertions(+), 63 deletions(-) diff --git a/integration-tests/happy-path/expectations/asset.py.json b/integration-tests/happy-path/expectations/asset.py.json index c6c58048a..ded223c43 100644 --- a/integration-tests/happy-path/expectations/asset.py.json +++ b/integration-tests/happy-path/expectations/asset.py.json @@ -11,7 +11,8 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.player_summary" + "value": "chess_playground.player_summary", + "columns": [] } ], "image": "python:3.11", @@ -64,7 +65,8 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.player_summary" + "value": "chess_playground.player_summary", + "columns": [] } ], "image": "python:3.11", @@ -181,11 +183,13 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.games" + "value": "chess_playground.games", + "columns": [] }, { "type": "asset", - "value": "chess_playground.profiles" + "value": "chess_playground.profiles", + "columns": [] } ], "image": "", @@ -218,7 +222,8 @@ "value": null, "blocking": true } - ] + ], + "upstreams": null } ], "custom_checks": [], @@ -241,4 +246,4 @@ "repo": { "path": "/integration-tests" } -} \ No newline at end of file +} diff --git a/integration-tests/happy-path/expectations/chess_games.asset.yml.json b/integration-tests/happy-path/expectations/chess_games.asset.yml.json index c321a7e46..c400ec088 100644 --- a/integration-tests/happy-path/expectations/chess_games.asset.yml.json +++ b/integration-tests/happy-path/expectations/chess_games.asset.yml.json @@ -14,12 +14,12 @@ "owner": "", "executable_file": { "name": "chess_games.asset.yml", - "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", "content": "name: chess_playground.games\ntype: ingestr\nparameters:\n source_connection: chess-default\n source_table: games\n destination: duckdb" }, "definition_file": { "name": "chess_games.asset.yml", - "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", "type": "yaml" }, "parameters": { @@ -41,7 +41,7 @@ "start_date": "", "definition_file": { "name": "pipeline.yml", - "path": "/integration-tests/happy-path/pipeline.yml" + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/pipeline.yml" }, "default_parameters": {}, "default_connections": {}, @@ -58,7 +58,8 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.player_summary" + "value": "chess_playground.player_summary", + "columns": [] } ], "image": "python:3.11", @@ -66,12 +67,12 @@ "owner": "", "executable_file": { "name": "asset.py", - "path": "/integration-tests/happy-path/assets/asset.py", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/asset.py", "content": "import os\nimport duckdb\n\nif os.getenv('INJECTED1') != \"value1\":\n raise Exception(\"KEY1 is not injected correctly\")\n\ncon = duckdb.connect(database = \"duckdb.db\", read_only = False)\n\ncon.execute(\"SELECT * FROM chess_playground.player_summary\")\nresult = con.fetchall()\nif len(result) != 2:\n raise Exception(\"Incorrect number of rows in player_summary\")" }, "definition_file": { "name": "asset.py", - "path": "/integration-tests/happy-path/assets/asset.py", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/asset.py", "type": "comment" }, "parameters": {}, @@ -102,12 +103,12 @@ "owner": "", "executable_file": { "name": "chess_games.asset.yml", - "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", "content": "name: chess_playground.games\ntype: ingestr\nparameters:\n source_connection: chess-default\n source_table: games\n destination: duckdb" }, "definition_file": { "name": "chess_games.asset.yml", - "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", "type": "yaml" }, "parameters": { @@ -137,12 +138,12 @@ "owner": "", "executable_file": { "name": "chess_profiles.asset.yml", - "path": "/integration-tests/happy-path/assets/chess_profiles.asset.yml", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_profiles.asset.yml", "content": "name: chess_playground.profiles\ntype: ingestr\nparameters:\n source_connection: chess-default\n source_table: profiles\n destination: duckdb" }, "definition_file": { "name": "chess_profiles.asset.yml", - "path": "/integration-tests/happy-path/assets/chess_profiles.asset.yml", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_profiles.asset.yml", "type": "yaml" }, "parameters": { @@ -175,11 +176,13 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.games" + "value": "chess_playground.games", + "columns": [] }, { "type": "asset", - "value": "chess_playground.profiles" + "value": "chess_playground.profiles", + "columns": [] } ], "image": "", @@ -187,12 +190,12 @@ "owner": "", "executable_file": { "name": "player_summary.sql", - "path": "/integration-tests/happy-path/assets/player_summary.sql", - "content": "WITH game_results AS (\n SELECT\n CASE\n WHEN g.white-\u003e\u003e'result' = 'win' THEN g.white-\u003e\u003e'@id'\n WHEN g.black-\u003e\u003e'result' = 'win' THEN g.black-\u003e\u003e'@id'\n ELSE NULL\n END AS winner_aid,\n g.white-\u003e\u003e'@id' AS white_aid,\n g.black-\u003e\u003e'@id' AS black_aid\nFROM chess_playground.games g\n)\n\nSELECT\n p.username,\n p.aid,\n COUNT(*) AS total_games,\n COUNT(CASE WHEN g.white_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) AS white_wins,\n COUNT(CASE WHEN g.black_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) AS black_wins,\n COUNT(CASE WHEN g.white_aid = p.aid THEN 1 END) AS white_games,\n COUNT(CASE WHEN g.black_aid = p.aid THEN 1 END) AS black_games,\n ROUND(COUNT(CASE WHEN g.white_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) * 100.0 / NULLIF(COUNT(CASE WHEN g.white_aid = p.aid THEN 1 END), 0), 2) AS white_win_rate,\n ROUND(COUNT(CASE WHEN g.black_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) * 100.0 / NULLIF(COUNT(CASE WHEN g.black_aid = p.aid THEN 1 END), 0), 2) AS black_win_rate\nFROM chess_playground.profiles p\nLEFT JOIN game_results g\n ON p.aid IN (g.white_aid, g.black_aid)\nGROUP BY p.username, p.aid\nORDER BY total_games DESC" + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", + "content": "WITH game_results AS (\n SELECT\n CASE\n WHEN g.white->>'result' = 'win' THEN g.white->>'@id'\n WHEN g.black->>'result' = 'win' THEN g.black->>'@id'\n ELSE NULL\n END AS winner_aid,\n g.white->>'@id' AS white_aid,\n g.black->>'@id' AS black_aid\nFROM chess_playground.games g\n)\n\nSELECT\n p.username,\n p.aid,\n COUNT(*) AS total_games,\n COUNT(CASE WHEN g.white_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) AS white_wins,\n COUNT(CASE WHEN g.black_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) AS black_wins,\n COUNT(CASE WHEN g.white_aid = p.aid THEN 1 END) AS white_games,\n COUNT(CASE WHEN g.black_aid = p.aid THEN 1 END) AS black_games,\n ROUND(COUNT(CASE WHEN g.white_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) * 100.0 / NULLIF(COUNT(CASE WHEN g.white_aid = p.aid THEN 1 END), 0), 2) AS white_win_rate,\n ROUND(COUNT(CASE WHEN g.black_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) * 100.0 / NULLIF(COUNT(CASE WHEN g.black_aid = p.aid THEN 1 END), 0), 2) AS black_win_rate\nFROM chess_playground.profiles p\nLEFT JOIN game_results g\n ON p.aid IN (g.white_aid, g.black_aid)\nGROUP BY p.username, p.aid\nORDER BY total_games DESC" }, "definition_file": { "name": "player_summary.sql", - "path": "/integration-tests/happy-path/assets/player_summary.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", "type": "comment" }, "parameters": {}, @@ -212,7 +215,8 @@ "value": null, "blocking": true } - ] + ], + "upstreams": null } ], "custom_checks": [], @@ -233,6 +237,6 @@ "retries": 0 }, "repo": { - "path": "/integration-tests" + "path": "/Users/yuvraj/Workspace/bruin/integration-tests" } -} \ No newline at end of file +} diff --git a/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json b/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json index 2771cf675..9fe4ce9e9 100644 --- a/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json +++ b/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json @@ -58,7 +58,8 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.player_summary" + "value": "chess_playground.player_summary", + "columns": [] } ], "image": "python:3.11", @@ -175,11 +176,13 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.games" + "value": "chess_playground.games", + "columns": [] }, { "type": "asset", - "value": "chess_playground.profiles" + "value": "chess_playground.profiles", + "columns": [] } ], "image": "", @@ -212,7 +215,8 @@ "value": null, "blocking": true } - ] + ], + "upstreams": null } ], "custom_checks": [], @@ -235,4 +239,4 @@ "repo": { "path": "/integration-tests" } -} \ No newline at end of file +} diff --git a/integration-tests/happy-path/expectations/pipeline.yml.json b/integration-tests/happy-path/expectations/pipeline.yml.json index ac816d42a..359c82b3f 100644 --- a/integration-tests/happy-path/expectations/pipeline.yml.json +++ b/integration-tests/happy-path/expectations/pipeline.yml.json @@ -22,7 +22,8 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.player_summary" + "value": "chess_playground.player_summary", + "columns": [] } ], "image": "python:3.11", @@ -139,11 +140,13 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.games" + "value": "chess_playground.games", + "columns": [] }, { "type": "asset", - "value": "chess_playground.profiles" + "value": "chess_playground.profiles", + "columns": [] } ], "image": "", @@ -151,12 +154,12 @@ "owner": "", "executable_file": { "name": "player_summary.sql", - "path": "__ASSETSDIR__player_summary.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", "content": "" }, "definition_file": { "name": "player_summary.sql", - "path": "__ASSETSDIR__player_summary.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", "type": "comment" }, "parameters": {}, @@ -176,7 +179,8 @@ "value": null, "blocking": true } - ] + ], + "upstreams": null } ], "custom_checks": [], diff --git a/integration-tests/happy-path/expectations/player_summary.sql.json b/integration-tests/happy-path/expectations/player_summary.sql.json index b5ec2f21c..0ec40a829 100644 --- a/integration-tests/happy-path/expectations/player_summary.sql.json +++ b/integration-tests/happy-path/expectations/player_summary.sql.json @@ -17,11 +17,13 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.games" + "value": "chess_playground.games", + "columns": [] }, { "type": "asset", - "value": "chess_playground.profiles" + "value": "chess_playground.profiles", + "columns": [] } ], "image": "", @@ -54,7 +56,8 @@ "value": null, "blocking": true } - ] + ], + "upstreams": null } ], "custom_checks": [], @@ -86,7 +89,8 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.player_summary" + "value": "chess_playground.player_summary", + "columns": [] } ], "image": "python:3.11", @@ -203,11 +207,13 @@ "upstreams": [ { "type": "asset", - "value": "chess_playground.games" + "value": "chess_playground.games", + "columns": [] }, { "type": "asset", - "value": "chess_playground.profiles" + "value": "chess_playground.profiles", + "columns": [] } ], "image": "", @@ -240,7 +246,8 @@ "value": null, "blocking": true } - ] + ], + "upstreams": null } ], "custom_checks": [], @@ -263,4 +270,4 @@ "repo": { "path": "/integration-tests" } -} \ No newline at end of file +} diff --git a/integration-tests/lineage/expectations/lineage-asset.json b/integration-tests/lineage/expectations/lineage-asset.json index cfb118fe6..eff34af2e 100644 --- a/integration-tests/lineage/expectations/lineage-asset.json +++ b/integration-tests/lineage/expectations/lineage-asset.json @@ -528,7 +528,8 @@ "description": "Just a number", "primary_key": true, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -537,7 +538,8 @@ "description": "Just a name", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -546,7 +548,8 @@ "description": "Just a last name", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -555,7 +558,8 @@ "description": "Just a country", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -564,7 +568,8 @@ "description": "Just a timestamp", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null } ], "custom_checks": [], diff --git a/integration-tests/lineage/expectations/lineage.json b/integration-tests/lineage/expectations/lineage.json index 2cc073677..4685efcb9 100644 --- a/integration-tests/lineage/expectations/lineage.json +++ b/integration-tests/lineage/expectations/lineage.json @@ -5,7 +5,7 @@ "start_date": "", "definition_file": { "name": "pipeline.yml", - "path": "__PIPELINEDIR__pipeline.yml" + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/pipeline.yml" }, "default_parameters": {}, "default_connections": {}, @@ -46,12 +46,12 @@ "owner": "", "executable_file": { "name": "country.sql", - "path": "__ASSETSDIR__country.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/country.sql", "content": "" }, "definition_file": { "name": "country.sql", - "path": "__ASSETSDIR__country.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/country.sql", "type": "comment" }, "parameters": {}, @@ -151,12 +151,12 @@ "owner": "", "executable_file": { "name": "example.sql", - "path": "__ASSETSDIR__example.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/example.sql", "content": "" }, "definition_file": { "name": "example.sql", - "path": "__ASSETSDIR__example.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/example.sql", "type": "comment" }, "parameters": {}, @@ -276,12 +276,12 @@ "owner": "", "executable_file": { "name": "people.sql", - "path": "__ASSETSDIR__people.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/people.sql", "content": "" }, "definition_file": { "name": "people.sql", - "path": "__ASSETSDIR__people.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/people.sql", "type": "comment" }, "parameters": {}, @@ -374,12 +374,12 @@ "owner": "", "executable_file": { "name": "users.sql", - "path": "__ASSETSDIR__users.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/users.sql", "content": "" }, "definition_file": { "name": "users.sql", - "path": "__ASSETSDIR__users.sql", + "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/users.sql", "type": "comment" }, "parameters": {}, @@ -392,7 +392,8 @@ "description": "Just a number", "primary_key": true, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -401,7 +402,8 @@ "description": "Just a name", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -410,7 +412,8 @@ "description": "Just a last name", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -419,7 +422,8 @@ "description": "Just a country", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null }, { "entity_attribute": null, @@ -428,7 +432,8 @@ "description": "Just a timestamp", "primary_key": false, "update_on_merge": false, - "checks": [] + "checks": [], + "upstreams": null } ], "custom_checks": [], diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 7ba38ef8b..e56bee207 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -429,7 +429,7 @@ type Column struct { UpdateOnMerge bool `json:"update_on_merge" yaml:"update_on_merge,omitempty" mapstructure:"update_on_merge"` Extends string `json:"-" yaml:"extends,omitempty" mapstructure:"extends"` Checks []ColumnCheck `json:"checks" yaml:"checks,omitempty" mapstructure:"checks"` - Upstreams []*UpstreamColumn `json:"upstreams" yaml:"-" mapstructure:"-"` + Upstreams []*UpstreamColumn `json:"upstreams" yaml:"upstreams" mapstructure:"upstreams"` } func (c *Column) HasCheck(check string) bool { From e136fa4aa5520826cc32db50ab4cda1e14aeb1b5 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 17:16:10 +0530 Subject: [PATCH 06/11] revert --- .../expectations/chess_games.asset.yml.json | 24 +++++++++---------- .../happy-path/expectations/pipeline.yml.json | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/integration-tests/happy-path/expectations/chess_games.asset.yml.json b/integration-tests/happy-path/expectations/chess_games.asset.yml.json index c400ec088..8dd459487 100644 --- a/integration-tests/happy-path/expectations/chess_games.asset.yml.json +++ b/integration-tests/happy-path/expectations/chess_games.asset.yml.json @@ -14,12 +14,12 @@ "owner": "", "executable_file": { "name": "chess_games.asset.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", "content": "name: chess_playground.games\ntype: ingestr\nparameters:\n source_connection: chess-default\n source_table: games\n destination: duckdb" }, "definition_file": { "name": "chess_games.asset.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", "type": "yaml" }, "parameters": { @@ -41,7 +41,7 @@ "start_date": "", "definition_file": { "name": "pipeline.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/pipeline.yml" + "path": "/integration-tests/happy-path/pipeline.yml" }, "default_parameters": {}, "default_connections": {}, @@ -67,12 +67,12 @@ "owner": "", "executable_file": { "name": "asset.py", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/asset.py", + "path": "/integration-tests/happy-path/assets/asset.py", "content": "import os\nimport duckdb\n\nif os.getenv('INJECTED1') != \"value1\":\n raise Exception(\"KEY1 is not injected correctly\")\n\ncon = duckdb.connect(database = \"duckdb.db\", read_only = False)\n\ncon.execute(\"SELECT * FROM chess_playground.player_summary\")\nresult = con.fetchall()\nif len(result) != 2:\n raise Exception(\"Incorrect number of rows in player_summary\")" }, "definition_file": { "name": "asset.py", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/asset.py", + "path": "/integration-tests/happy-path/assets/asset.py", "type": "comment" }, "parameters": {}, @@ -103,12 +103,12 @@ "owner": "", "executable_file": { "name": "chess_games.asset.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", "content": "name: chess_playground.games\ntype: ingestr\nparameters:\n source_connection: chess-default\n source_table: games\n destination: duckdb" }, "definition_file": { "name": "chess_games.asset.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_games.asset.yml", + "path": "/integration-tests/happy-path/assets/chess_games.asset.yml", "type": "yaml" }, "parameters": { @@ -138,12 +138,12 @@ "owner": "", "executable_file": { "name": "chess_profiles.asset.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_profiles.asset.yml", + "path": "/integration-tests/happy-path/assets/chess_profiles.asset.yml", "content": "name: chess_playground.profiles\ntype: ingestr\nparameters:\n source_connection: chess-default\n source_table: profiles\n destination: duckdb" }, "definition_file": { "name": "chess_profiles.asset.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/chess_profiles.asset.yml", + "path": "/integration-tests/happy-path/assets/chess_profiles.asset.yml", "type": "yaml" }, "parameters": { @@ -190,12 +190,12 @@ "owner": "", "executable_file": { "name": "player_summary.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", + "path": "/integration-tests/happy-path/assets/player_summary.sql", "content": "WITH game_results AS (\n SELECT\n CASE\n WHEN g.white->>'result' = 'win' THEN g.white->>'@id'\n WHEN g.black->>'result' = 'win' THEN g.black->>'@id'\n ELSE NULL\n END AS winner_aid,\n g.white->>'@id' AS white_aid,\n g.black->>'@id' AS black_aid\nFROM chess_playground.games g\n)\n\nSELECT\n p.username,\n p.aid,\n COUNT(*) AS total_games,\n COUNT(CASE WHEN g.white_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) AS white_wins,\n COUNT(CASE WHEN g.black_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) AS black_wins,\n COUNT(CASE WHEN g.white_aid = p.aid THEN 1 END) AS white_games,\n COUNT(CASE WHEN g.black_aid = p.aid THEN 1 END) AS black_games,\n ROUND(COUNT(CASE WHEN g.white_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) * 100.0 / NULLIF(COUNT(CASE WHEN g.white_aid = p.aid THEN 1 END), 0), 2) AS white_win_rate,\n ROUND(COUNT(CASE WHEN g.black_aid = p.aid AND g.winner_aid = p.aid THEN 1 END) * 100.0 / NULLIF(COUNT(CASE WHEN g.black_aid = p.aid THEN 1 END), 0), 2) AS black_win_rate\nFROM chess_playground.profiles p\nLEFT JOIN game_results g\n ON p.aid IN (g.white_aid, g.black_aid)\nGROUP BY p.username, p.aid\nORDER BY total_games DESC" }, "definition_file": { "name": "player_summary.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", + "path": "/integration-tests/happy-path/assets/player_summary.sql", "type": "comment" }, "parameters": {}, @@ -237,6 +237,6 @@ "retries": 0 }, "repo": { - "path": "/Users/yuvraj/Workspace/bruin/integration-tests" + "path": "/integration-tests" } } diff --git a/integration-tests/happy-path/expectations/pipeline.yml.json b/integration-tests/happy-path/expectations/pipeline.yml.json index 359c82b3f..f2e6aa54c 100644 --- a/integration-tests/happy-path/expectations/pipeline.yml.json +++ b/integration-tests/happy-path/expectations/pipeline.yml.json @@ -159,7 +159,7 @@ }, "definition_file": { "name": "player_summary.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", + "path": "__ASSETSDIR__player_summary.sql", "type": "comment" }, "parameters": {}, From d7f97ef2b3dd62883cb89b795fa10daf415b90e3 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 17:18:31 +0530 Subject: [PATCH 07/11] revert --- .../happy-path/expectations/pipeline.yml.json | 2 +- .../lineage/expectations/lineage.json | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/integration-tests/happy-path/expectations/pipeline.yml.json b/integration-tests/happy-path/expectations/pipeline.yml.json index f2e6aa54c..b0916c1f6 100644 --- a/integration-tests/happy-path/expectations/pipeline.yml.json +++ b/integration-tests/happy-path/expectations/pipeline.yml.json @@ -154,7 +154,7 @@ "owner": "", "executable_file": { "name": "player_summary.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/happy-path/assets/player_summary.sql", + "path": "__ASSETSDIR__player_summary.sql", "content": "" }, "definition_file": { diff --git a/integration-tests/lineage/expectations/lineage.json b/integration-tests/lineage/expectations/lineage.json index 4685efcb9..cde11f6e4 100644 --- a/integration-tests/lineage/expectations/lineage.json +++ b/integration-tests/lineage/expectations/lineage.json @@ -5,7 +5,7 @@ "start_date": "", "definition_file": { "name": "pipeline.yml", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/pipeline.yml" + "path": "__PIPELINEDIR__pipeline.yml" }, "default_parameters": {}, "default_connections": {}, @@ -46,12 +46,12 @@ "owner": "", "executable_file": { "name": "country.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/country.sql", + "path": "__ASSETSDIR__country.sql", "content": "" }, "definition_file": { "name": "country.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/country.sql", + "path": "__ASSETSDIR__country.sql", "type": "comment" }, "parameters": {}, @@ -151,12 +151,12 @@ "owner": "", "executable_file": { "name": "example.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/example.sql", + "path": "__ASSETSDIR__example.sql", "content": "" }, "definition_file": { "name": "example.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/example.sql", + "path": "__ASSETSDIR__example.sql", "type": "comment" }, "parameters": {}, @@ -276,12 +276,12 @@ "owner": "", "executable_file": { "name": "people.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/people.sql", + "path": "__ASSETSDIR__people.sql", "content": "" }, "definition_file": { "name": "people.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/people.sql", + "path": "__ASSETSDIR__people.sql", "type": "comment" }, "parameters": {}, @@ -374,12 +374,12 @@ "owner": "", "executable_file": { "name": "users.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/users.sql", + "path": "__ASSETSDIR__users.sql", "content": "" }, "definition_file": { "name": "users.sql", - "path": "/Users/yuvraj/Workspace/bruin/integration-tests/lineage/assets/users.sql", + "path": "__ASSETSDIR__users.sql", "type": "comment" }, "parameters": {}, From 101d598be20e327613e878b88f73d39391639573 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 17:46:49 +0530 Subject: [PATCH 08/11] revert --- .../happy-path/expectations/asset.py.json | 3 +-- .../expectations/chess_games.asset.yml.json | 3 +-- .../expectations/chess_profiles.asset.yml.json | 3 +-- .../happy-path/expectations/pipeline.yml.json | 3 +-- .../expectations/player_summary.sql.json | 6 ++---- .../lineage/expectations/lineage-asset.json | 15 +++++---------- .../lineage/expectations/lineage.json | 15 +++++---------- pkg/pipeline/pipeline.go | 2 +- 8 files changed, 17 insertions(+), 33 deletions(-) diff --git a/integration-tests/happy-path/expectations/asset.py.json b/integration-tests/happy-path/expectations/asset.py.json index ded223c43..e6b542672 100644 --- a/integration-tests/happy-path/expectations/asset.py.json +++ b/integration-tests/happy-path/expectations/asset.py.json @@ -222,8 +222,7 @@ "value": null, "blocking": true } - ], - "upstreams": null + ] } ], "custom_checks": [], diff --git a/integration-tests/happy-path/expectations/chess_games.asset.yml.json b/integration-tests/happy-path/expectations/chess_games.asset.yml.json index 8dd459487..ada96e84d 100644 --- a/integration-tests/happy-path/expectations/chess_games.asset.yml.json +++ b/integration-tests/happy-path/expectations/chess_games.asset.yml.json @@ -215,8 +215,7 @@ "value": null, "blocking": true } - ], - "upstreams": null + ] } ], "custom_checks": [], diff --git a/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json b/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json index 9fe4ce9e9..b73f40d1f 100644 --- a/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json +++ b/integration-tests/happy-path/expectations/chess_profiles.asset.yml.json @@ -215,8 +215,7 @@ "value": null, "blocking": true } - ], - "upstreams": null + ] } ], "custom_checks": [], diff --git a/integration-tests/happy-path/expectations/pipeline.yml.json b/integration-tests/happy-path/expectations/pipeline.yml.json index b0916c1f6..0cc5aef38 100644 --- a/integration-tests/happy-path/expectations/pipeline.yml.json +++ b/integration-tests/happy-path/expectations/pipeline.yml.json @@ -179,8 +179,7 @@ "value": null, "blocking": true } - ], - "upstreams": null + ] } ], "custom_checks": [], diff --git a/integration-tests/happy-path/expectations/player_summary.sql.json b/integration-tests/happy-path/expectations/player_summary.sql.json index 0ec40a829..e20ae4ae0 100644 --- a/integration-tests/happy-path/expectations/player_summary.sql.json +++ b/integration-tests/happy-path/expectations/player_summary.sql.json @@ -56,8 +56,7 @@ "value": null, "blocking": true } - ], - "upstreams": null + ] } ], "custom_checks": [], @@ -246,8 +245,7 @@ "value": null, "blocking": true } - ], - "upstreams": null + ] } ], "custom_checks": [], diff --git a/integration-tests/lineage/expectations/lineage-asset.json b/integration-tests/lineage/expectations/lineage-asset.json index eff34af2e..cfb118fe6 100644 --- a/integration-tests/lineage/expectations/lineage-asset.json +++ b/integration-tests/lineage/expectations/lineage-asset.json @@ -528,8 +528,7 @@ "description": "Just a number", "primary_key": true, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -538,8 +537,7 @@ "description": "Just a name", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -548,8 +546,7 @@ "description": "Just a last name", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -558,8 +555,7 @@ "description": "Just a country", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -568,8 +564,7 @@ "description": "Just a timestamp", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] } ], "custom_checks": [], diff --git a/integration-tests/lineage/expectations/lineage.json b/integration-tests/lineage/expectations/lineage.json index cde11f6e4..2cc073677 100644 --- a/integration-tests/lineage/expectations/lineage.json +++ b/integration-tests/lineage/expectations/lineage.json @@ -392,8 +392,7 @@ "description": "Just a number", "primary_key": true, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -402,8 +401,7 @@ "description": "Just a name", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -412,8 +410,7 @@ "description": "Just a last name", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -422,8 +419,7 @@ "description": "Just a country", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] }, { "entity_attribute": null, @@ -432,8 +428,7 @@ "description": "Just a timestamp", "primary_key": false, "update_on_merge": false, - "checks": [], - "upstreams": null + "checks": [] } ], "custom_checks": [], diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index e56bee207..e7c9181ba 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -429,7 +429,7 @@ type Column struct { UpdateOnMerge bool `json:"update_on_merge" yaml:"update_on_merge,omitempty" mapstructure:"update_on_merge"` Extends string `json:"-" yaml:"extends,omitempty" mapstructure:"extends"` Checks []ColumnCheck `json:"checks" yaml:"checks,omitempty" mapstructure:"checks"` - Upstreams []*UpstreamColumn `json:"upstreams" yaml:"upstreams" mapstructure:"upstreams"` + Upstreams []*UpstreamColumn `json:"upstreams,omitempty" yaml:"upstreams" mapstructure:"upstreams"` } func (c *Column) HasCheck(check string) bool { From 9c3264da3edfa57bafdaa409f07dd11b238d2151 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 18:50:16 +0530 Subject: [PATCH 09/11] fix tests --- pkg/pipeline/pipeline.go | 2 +- .../pipeline/first-pipeline_unix.json | 39 ++++++++++++------- .../pipeline/first-pipeline_windows.json | 36 +++++++++++------ .../pipeline/second-pipeline_unix.json | 15 ++++--- .../pipeline/second-pipeline_windows.json | 15 ++++--- 5 files changed, 71 insertions(+), 36 deletions(-) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index e7c9181ba..0d4270f92 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -429,7 +429,7 @@ type Column struct { UpdateOnMerge bool `json:"update_on_merge" yaml:"update_on_merge,omitempty" mapstructure:"update_on_merge"` Extends string `json:"-" yaml:"extends,omitempty" mapstructure:"extends"` Checks []ColumnCheck `json:"checks" yaml:"checks,omitempty" mapstructure:"checks"` - Upstreams []*UpstreamColumn `json:"upstreams,omitempty" yaml:"upstreams" mapstructure:"upstreams"` + Upstreams []*UpstreamColumn `json:"upstreams,omitempty" yaml:"-" mapstructure:"-"` } func (c *Column) HasCheck(check string) bool { diff --git a/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json b/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json index 6516be086..5f76175fb 100644 --- a/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json +++ b/pkg/pipeline/testdata/pipeline/first-pipeline_unix.json @@ -54,7 +54,8 @@ "upstreams": [ { "type": "asset", - "value": "gcs-to-bq" + "value": "gcs-to-bq", + "columns": [] } ] }, @@ -129,27 +130,33 @@ "upstreams": [ { "type": "asset", - "value": "task1" + "value": "task1", + "columns": [] }, { "type": "asset", - "value": "task2" + "value": "task2", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] }, { "type": "asset", - "value": "task4" + "value": "task4", + "columns": [] }, { "type": "asset", - "value": "task5" + "value": "task5", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] } ] }, @@ -188,27 +195,33 @@ "upstreams": [ { "type": "asset", - "value": "task1" + "value": "task1", + "columns": [] }, { "type": "asset", - "value": "task2" + "value": "task2", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] }, { "type": "asset", - "value": "task4" + "value": "task4", + "columns": [] }, { "type": "asset", - "value": "task5" + "value": "task5", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] } ] } diff --git a/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json b/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json index b887a8015..5ec448e3f 100644 --- a/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json +++ b/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json @@ -121,27 +121,33 @@ "upstreams": [ { "type": "asset", - "value": "task1" + "value": "task1", + "columns": [] }, { "type": "asset", - "value": "task2" + "value": "task2", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] }, { "type": "asset", - "value": "task4" + "value": "task4", + "columns": [] }, { "type": "asset", - "value": "task5" + "value": "task5", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] } ], "materialization": null, @@ -180,27 +186,33 @@ "upstreams": [ { "type": "asset", - "value": "task1" + "value": "task1", + "columns": [] }, { "type": "asset", - "value": "task2" + "value": "task2", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] }, { "type": "asset", - "value": "task4" + "value": "task4", + "columns": [] }, { "type": "asset", - "value": "task5" + "value": "task5", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] } ], "materialization": null, diff --git a/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json b/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json index f824d25f7..8699943a0 100644 --- a/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json +++ b/pkg/pipeline/testdata/pipeline/second-pipeline_unix.json @@ -194,23 +194,28 @@ "upstreams": [ { "type": "asset", - "value": "task1" + "value": "task1", + "columns": [] }, { "type": "asset", - "value": "task2" + "value": "task2", + "columns": [] }, { "type": "asset", - "value": "task3" + "value": "task3", + "columns": [] }, { "type": "asset", - "value": "task4" + "value": "task4", + "columns": [] }, { "type": "asset", - "value": "task5" + "value": "task5", + "columns": [] } ] } diff --git a/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json b/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json index 23c91a6ed..0c9795e53 100644 --- a/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json +++ b/pkg/pipeline/testdata/pipeline/second-pipeline_windows.json @@ -124,11 +124,16 @@ ], "athena": null, "upstreams": [ - {"type" : "asset", "value" : "task1"}, - {"type" : "asset", "value" : "task2"}, - {"type" : "asset", "value" : "task3"}, - {"type" : "asset", "value" : "task4"}, - {"type" : "asset", "value" : "task5"} + {"type" : "asset", "value" : "task1", + "columns": []}, + {"type" : "asset", "value" : "task2", + "columns": []}, + {"type" : "asset", "value" : "task3", + "columns": []}, + {"type" : "asset", "value" : "task4", + "columns": []}, + {"type" : "asset", "value" : "task5", + "columns": []} ], "materialization": null, "columns": [ From 5e454e489743837fcf0a035bec70adab75632cd6 Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 18:55:23 +0530 Subject: [PATCH 10/11] Update Upstream struct to mark 'columns' field as optional in JSON/YAML serialization. --- pkg/pipeline/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 0d4270f92..d58dee067 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -508,7 +508,7 @@ type Upstream struct { Type string `json:"type" yaml:"type" mapstructure:"type"` Value string `json:"value" yaml:"value" mapstructure:"value"` Metadata EmptyStringMap `json:"metadata,omitempty" yaml:"metadata,omitempty" mapstructure:"metadata"` - Columns []DependsColumn `json:"columns" yaml:"columns" mapstructure:"columns"` + Columns []DependsColumn `json:"columns" yaml:"columns,omitempty" mapstructure:"columns"` } func (u Upstream) MarshalYAML() (interface{}, error) { From 0b73ec008c84873388395ec0f8fa65f53485b6af Mon Sep 17 00:00:00 2001 From: y-bruin Date: Thu, 12 Dec 2024 19:03:53 +0530 Subject: [PATCH 11/11] fix windows test --- pkg/pipeline/testdata/pipeline/first-pipeline_windows.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json b/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json index 5ec448e3f..70b22b766 100644 --- a/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json +++ b/pkg/pipeline/testdata/pipeline/first-pipeline_windows.json @@ -46,7 +46,8 @@ "upstreams": [ { "type": "asset", - "value": "gcs-to-bq" + "value": "gcs-to-bq", + "columns": [] } ], "materialization": null,