Skip to content

Commit

Permalink
Merge branch 'main' into refactor-init
Browse files Browse the repository at this point in the history
  • Loading branch information
terzioglub authored Jan 31, 2025
2 parents abc52e2 + 60e1545 commit 15b83ac
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/assets/seed.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Seed Assets
Seeds are CSV-files that contain data that is prepared outside of your pipeline that will be loaded into your data platform. Bruin supports seed assets natively, allowing you to simply drop a CSV file in your pipeline and ensuring the data is loaded to the destination platform accurately.

You can define seed assets in a file ending with `.yaml`:
You can define seed assets in a file ending with `.asset.yaml`:
```yaml
name: dashboard.hello
type: duckdb.seed
Expand Down
23 changes: 17 additions & 6 deletions pkg/pipeline/lineage.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,19 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset
if upstream.Table == asset.Name {
continue
}
upstreamAsset := foundPipeline.GetAssetByName(upstream.Table)
if upstreamAsset == nil {

tableSpec := strings.Split(upstream.Table, ".")

upstreamAsset := foundPipeline.GetAssetByName(tableSpec[len(tableSpec)-1])
if upstreamAsset == nil && upstream.Table != "" {
if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{
Name: upstream.Column,
Type: lineageCol.Type,
Checks: []ColumnCheck{},
Upstreams: []*UpstreamColumn{
{
Column: upstream.Column,
Table: strings.ToLower(upstream.Table),
Table: strings.ToLower(tableSpec[len(tableSpec)-1]),
},
},
}); err != nil {
Expand Down Expand Up @@ -232,14 +235,22 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset

// addColumnToAsset adds a new column to the asset based on upstream information.
func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstreamAsset *Asset, upstreamCol *Column) error {
if asset == nil || upstreamCol == nil || colName == "" {
if asset == nil || colName == "" {
return errors.New("invalid arguments: all parameters must be non-nil and colName must not be empty")
}

if colName == "*" {
return nil
}

if upstreamAsset == nil {
existingCol := asset.GetColumnWithName(strings.ToLower(upstreamCol.Name))
if existingCol == nil {
asset.Columns = append(asset.Columns, *upstreamCol)
return nil
}
return nil
}
existingCol := asset.GetColumnWithName(colName)
if existingCol != nil {
if len(existingCol.Description) == 0 {
Expand All @@ -254,6 +265,7 @@ func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstre
newUpstream := UpstreamColumn{
Column: upstreamCol.Name,
}

if upstreamAsset != nil {
newUpstream.Table = upstreamAsset.Name
}
Expand Down Expand Up @@ -293,8 +305,7 @@ func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstre
// upstreamExists checks if a given upstream already exists in the list.
func upstreamExists(upstreams []*UpstreamColumn, newUpstream UpstreamColumn) bool {
for _, existingUpstream := range upstreams {
if strings.EqualFold(existingUpstream.Column, newUpstream.Column) &&
strings.EqualFold(existingUpstream.Table, newUpstream.Table) {
if strings.EqualFold(existingUpstream.Column, newUpstream.Column) {
return true
}
}
Expand Down
2 changes: 0 additions & 2 deletions pythonsrc/parser/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import logging
from dataclasses import dataclass

from sqlglot import parse_one, exp, lineage
from sqlglot.lineage import Node
from sqlglot.optimizer import optimize
Expand Down Expand Up @@ -171,7 +170,6 @@ def get_column_lineage(query: str, schema: dict, dialect: str):
{"column": column.name, "table": column.table}
)
non_selected_columns = list(non_selected_columns_dict.values())

return {
"columns": result,
"non_selected_columns": non_selected_columns,
Expand Down

0 comments on commit 15b83ac

Please sign in to comment.