Skip to content

Commit

Permalink
more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
y-bruin committed Jan 31, 2025
1 parent c9e2f6f commit 234d16f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 14 deletions.
7 changes: 5 additions & 2 deletions pkg/pipeline/lineage.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset
if upstream.Table == asset.Name {
continue
}
upstreamAsset := foundPipeline.GetAssetByName(upstream.Table)

tableSpec := strings.Split(upstream.Table, ".")
table_name := tableSpec[len(strings.Split(upstream.Table, "."))-1]

Check failure on line 195 in pkg/pipeline/lineage.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: should not use underscores in Go names; var table_name should be tableName (stylecheck)
upstreamAsset := foundPipeline.GetAssetByName(table_name)
if upstreamAsset == nil && upstream.Table != "" {
if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{
Name: upstream.Column,
Expand All @@ -199,7 +202,7 @@ func (p *LineageExtractor) processLineageColumns(foundPipeline *Pipeline, asset
Upstreams: []*UpstreamColumn{
{
Column: upstream.Column,
Table: strings.ToLower(upstream.Table),
Table: strings.ToLower(table_name),
},
},
}); err != nil {
Expand Down
13 changes: 1 addition & 12 deletions pythonsrc/parser/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def extract_non_selected_columns(parsed: exp.Select) -> list[Column]:

result = list(set(cols))
result.sort(key=lambda x: x.name + x.table)
for c in result:
c = Column(name=c.name, table=extract_table_name(c.table))
return result


Expand Down Expand Up @@ -117,7 +115,7 @@ def get_column_lineage(query: str, schema: dict, dialect: str):
nested_schema = schema_dict_to_schema_object(schema)
try:
optimized = optimize(parsed, nested_schema, dialect=dialect)
except Exception as e:
except Exception:
# try again without dialect, this solves some issues, e.g. https://github.com/tobymao/sqlglot/issues/4538
optimized = optimize(parsed, nested_schema)
except Exception as e:
Expand Down Expand Up @@ -157,8 +155,6 @@ def get_column_lineage(query: str, schema: dict, dialect: str):
cl = [dict(t) for t in {tuple(d.items()) for d in cl}]
cl.sort(key=lambda x: x["table"])

for c in cl:
c["table"] = extract_table_name(c["table"])
result.append({"name": col["name"], "upstream": cl, "type": col["type"]})

result.sort(key=lambda x: x["name"])
Expand Down Expand Up @@ -188,13 +184,6 @@ def find_leaf_nodes(node: Node, leaf_nodes):
find_leaf_nodes(child, leaf_nodes)


def extract_table_name(table: str) -> str:
if not table:
return ""
parts = table.split(".")
return parts[-1].lower()


def merge_parts(table: exp.Table) -> str:
return ".".join(
part.name for part in table.parts if isinstance(part, exp.Identifier)
Expand Down

0 comments on commit 234d16f

Please sign in to comment.