diff --git a/docs/assets/seed.md b/docs/assets/seed.md index f65fc14d..c011e316 100644 --- a/docs/assets/seed.md +++ b/docs/assets/seed.md @@ -1,7 +1,7 @@ # Seed Assets Seeds are CSV-files that contain data that is prepared outside of your pipeline that will be loaded into your data platform. Bruin supports seed assets natively, allowing you to simply drop a CSV file in your pipeline and ensuring the data is loaded to the destination platform accurately. -You can define seed assets in a file ending with `.yaml`: +You can define seed assets in a file ending with `.asset.yaml`: ```yaml name: dashboard.hello type: duckdb.seed diff --git a/pkg/pipeline/lineage.go b/pkg/pipeline/lineage.go index 9293e218..7515c04d 100644 --- a/pkg/pipeline/lineage.go +++ b/pkg/pipeline/lineage.go @@ -190,8 +190,11 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset if upstream.Table == asset.Name { continue } - upstreamAsset := foundPipeline.GetAssetByName(upstream.Table) - if upstreamAsset == nil { + + tableSpec := strings.Split(upstream.Table, ".") + + upstreamAsset := foundPipeline.GetAssetByName(tableSpec[len(tableSpec)-1]) + if upstreamAsset == nil && upstream.Table != "" { if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{ Name: upstream.Column, Type: lineageCol.Type, @@ -199,7 +202,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset Upstreams: []*UpstreamColumn{ { Column: upstream.Column, - Table: strings.ToLower(upstream.Table), + Table: strings.ToLower(tableSpec[len(tableSpec)-1]), }, }, }); err != nil { @@ -232,7 +235,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset // addColumnToAsset adds a new column to the asset based on upstream information. func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstreamAsset *Asset, upstreamCol *Column) error { - if asset == nil || upstreamCol == nil || colName == "" { + if asset == nil || colName == "" { return errors.New("invalid arguments: all parameters must be non-nil and colName must not be empty") } @@ -240,6 +243,14 @@ func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstre return nil } + if upstreamAsset == nil { + existingCol := asset.GetColumnWithName(strings.ToLower(upstreamCol.Name)) + if existingCol == nil { + asset.Columns = append(asset.Columns, *upstreamCol) + return nil + } + return nil + } existingCol := asset.GetColumnWithName(colName) if existingCol != nil { if len(existingCol.Description) == 0 { @@ -254,6 +265,7 @@ func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstre newUpstream := UpstreamColumn{ Column: upstreamCol.Name, } + if upstreamAsset != nil { newUpstream.Table = upstreamAsset.Name } @@ -293,8 +305,7 @@ func (p *LineageExtractor) addColumnToAsset(asset *Asset, colName string, upstre // upstreamExists checks if a given upstream already exists in the list. func upstreamExists(upstreams []*UpstreamColumn, newUpstream UpstreamColumn) bool { for _, existingUpstream := range upstreams { - if strings.EqualFold(existingUpstream.Column, newUpstream.Column) && - strings.EqualFold(existingUpstream.Table, newUpstream.Table) { + if strings.EqualFold(existingUpstream.Column, newUpstream.Column) { return true } } diff --git a/pythonsrc/parser/main.py b/pythonsrc/parser/main.py index 5c880cb9..82a3cc18 100644 --- a/pythonsrc/parser/main.py +++ b/pythonsrc/parser/main.py @@ -1,7 +1,6 @@ import json import logging from dataclasses import dataclass - from sqlglot import parse_one, exp, lineage from sqlglot.lineage import Node from sqlglot.optimizer import optimize @@ -171,7 +170,6 @@ def get_column_lineage(query: str, schema: dict, dialect: str): {"column": column.name, "table": column.table} ) non_selected_columns = list(non_selected_columns_dict.values()) - return { "columns": result, "non_selected_columns": non_selected_columns,