Skip to content

Commit 29f8189

Browse files
authored
Added support for datatype in lineage (bruin-data#280)
* Return the data type for sqlparser
1 parent 0df2ae8 commit 29f8189

File tree

6 files changed

+421
-37
lines changed

6 files changed

+421
-37
lines changed

pkg/pipeline/lineage.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,12 @@ func (p *LineageExtractor) parseLineage(asset *Asset) error {
8787
if err != nil {
8888
return fmt.Errorf("failed to render the query: %w", err)
8989
}
90+
9091
lineage, err := p.sqlParser.ColumnLineage(query, dialect, p.columnMetadata)
9192
if err != nil {
9293
return fmt.Errorf("failed to parse column lineage: %w", err)
9394
}
95+
9496
return p.processLineageColumns(asset, lineage)
9597
}
9698

@@ -127,6 +129,7 @@ func (p *LineageExtractor) processLineageColumns(asset *Asset, lineage *sqlparse
127129
if len(lineageCol.Upstream) == 0 {
128130
if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{
129131
Name: lineageCol.Name,
132+
Type: lineageCol.Type,
130133
Checks: []ColumnCheck{},
131134
Upstreams: []*UpstreamColumn{},
132135
}); err != nil {
@@ -146,7 +149,7 @@ func (p *LineageExtractor) processLineageColumns(asset *Asset, lineage *sqlparse
146149
if upstreamAsset == nil {
147150
if err := p.addColumnToAsset(asset, lineageCol.Name, nil, &Column{
148151
Name: upstream.Column,
149-
Type: strings.ToLower(upstream.Table),
152+
Type: lineageCol.Type,
150153
Checks: []ColumnCheck{},
151154
Upstreams: []*UpstreamColumn{
152155
{
@@ -165,7 +168,7 @@ func (p *LineageExtractor) processLineageColumns(asset *Asset, lineage *sqlparse
165168
if upstreamCol == nil {
166169
upstreamCol = &Column{
167170
Name: upstream.Column,
168-
Type: upstream.Table,
171+
Type: lineageCol.Type,
169172
Checks: []ColumnCheck{},
170173
Upstreams: []*UpstreamColumn{
171174
{

pkg/pipeline/lineage_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ func testAdvancedSQLFeatures(t *testing.T) {
766766
{
767767
Name: "report_generated_at",
768768
Upstreams: []*UpstreamColumn{{}},
769+
Type: "UNKNOWN",
769770
},
770771
},
771772
Upstreams: []Upstream{{Value: "raw_sales"}},
@@ -994,6 +995,7 @@ func testDialectSpecificFeatures(t *testing.T) {
994995
Name: "level",
995996
Upstreams: []*UpstreamColumn{},
996997
UpdateOnMerge: false,
998+
Type: "INT",
997999
},
9981000
{
9991001
Name: "dept_stats",

pkg/sqlparser/parser.go

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ type UpstreamColumn struct {
122122
type ColumnLineage struct {
123123
Name string `json:"name"`
124124
Upstream []UpstreamColumn `json:"upstream"`
125+
Type string `json:"type"`
125126
}
126127
type Lineage struct {
127128
Columns []ColumnLineage `json:"columns"`

pkg/sqlparser/parser_test.go

+42-16
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,21 @@ func GetLineageForRunner(t *testing.T, s lineager) {
4545
{Column: "a", Table: "table1"},
4646
{Column: "a", Table: "table2"},
4747
},
48+
Type: "TEXT",
4849
},
4950
{
5051
Name: "b",
5152
Upstream: []UpstreamColumn{
5253
{Column: "b", Table: "table1"},
5354
},
55+
Type: "BIGINT",
5456
},
5557
{
5658
Name: "c",
5759
Upstream: []UpstreamColumn{
5860
{Column: "c", Table: "table2"},
5961
},
62+
Type: "BIGINT",
6063
},
6164
},
6265
},
@@ -86,13 +89,15 @@ func GetLineageForRunner(t *testing.T, s lineager) {
8689
Upstream: []UpstreamColumn{
8790
{Column: "item_id", Table: "items"},
8891
},
92+
Type: "TEXT",
8993
},
9094
{
9195
Name: "price_category",
9296
Upstream: []UpstreamColumn{
9397
{Column: "price", Table: "items"},
9498
{Column: "somecol", Table: "orders"},
9599
},
100+
Type: "VARCHAR",
96101
},
97102
},
98103
},
@@ -115,12 +120,14 @@ func GetLineageForRunner(t *testing.T, s lineager) {
115120
Upstream: []UpstreamColumn{
116121
{Column: "col1", Table: "table1"},
117122
},
123+
Type: "BIGINT",
118124
},
119125
{
120126
Name: "col2",
121127
Upstream: []UpstreamColumn{
122128
{Column: "col2", Table: "table2"},
123129
},
130+
Type: "BIGINT",
124131
},
125132
},
126133
},
@@ -142,12 +149,14 @@ func GetLineageForRunner(t *testing.T, s lineager) {
142149
Upstream: []UpstreamColumn{
143150
{Column: "customer_id", Table: "orders"},
144151
},
152+
Type: "TEXT",
145153
},
146154
{
147155
Name: "order_count",
148156
Upstream: []UpstreamColumn{
149157
{Column: "order_id", Table: "orders"},
150158
},
159+
Type: "BIGINT",
151160
},
152161
},
153162
},
@@ -167,16 +176,18 @@ func GetLineageForRunner(t *testing.T, s lineager) {
167176
want: &Lineage{
168177
Columns: []ColumnLineage{
169178
{
170-
Name: "avg_salary",
179+
Name: "emp_id",
171180
Upstream: []UpstreamColumn{
172-
{Column: "salary", Table: "salaries"},
181+
{Column: "emp_id", Table: "employees"},
173182
},
183+
Type: "TEXT",
174184
},
175185
{
176-
Name: "emp_id",
186+
Name: "avg_salary",
177187
Upstream: []UpstreamColumn{
178-
{Column: "emp_id", Table: "employees"},
188+
{Column: "salary", Table: "salaries"},
179189
},
190+
Type: "DOUBLE",
180191
},
181192
},
182193
},
@@ -200,13 +211,15 @@ func GetLineageForRunner(t *testing.T, s lineager) {
200211
{Column: "id", Table: "customers"},
201212
{Column: "id", Table: "employees"},
202213
},
214+
Type: "TEXT",
203215
},
204216
{
205217
Name: "name",
206218
Upstream: []UpstreamColumn{
207219
{Column: "name", Table: "customers"},
208220
{Column: "name", Table: "employees"},
209221
},
222+
Type: "TEXT",
210223
},
211224
},
212225
},
@@ -228,12 +241,14 @@ func GetLineageForRunner(t *testing.T, s lineager) {
228241
Upstream: []UpstreamColumn{
229242
{Column: "id", Table: "employees"},
230243
},
244+
Type: "TEXT",
231245
},
232246
{
233247
Name: "manager_id",
234248
Upstream: []UpstreamColumn{
235249
{Column: "manager_id", Table: "employees"},
236250
},
251+
Type: "TEXT",
237252
},
238253
},
239254
},
@@ -263,27 +278,31 @@ func GetLineageForRunner(t *testing.T, s lineager) {
263278
},
264279
want: &Lineage{
265280
Columns: []ColumnLineage{
266-
{
267-
Name: "fixed",
268-
Upstream: []UpstreamColumn{},
269-
},
270281
{
271282
Name: "id",
272283
Upstream: []UpstreamColumn{
273284
{Column: "id", Table: "sales"},
274285
},
286+
Type: "TEXT",
275287
},
276288
{
277-
Name: "region_abbr",
289+
Name: "sale_size",
278290
Upstream: []UpstreamColumn{
279-
{Column: "name", Table: "regions"},
291+
{Column: "amount", Table: "sales"},
280292
},
293+
Type: "VARCHAR",
281294
},
282295
{
283-
Name: "sale_size",
296+
Name: "region_abbr",
284297
Upstream: []UpstreamColumn{
285-
{Column: "amount", Table: "sales"},
298+
{Column: "name", Table: "regions"},
286299
},
300+
Type: "VARCHAR",
301+
},
302+
{
303+
Name: "fixed",
304+
Upstream: []UpstreamColumn{},
305+
Type: "VARCHAR",
287306
},
288307
},
289308
},
@@ -318,34 +337,41 @@ func GetLineageForRunner(t *testing.T, s lineager) {
318337
{Column: "a", Table: "table1"},
319338
{Column: "a", Table: "table2"},
320339
},
340+
Type: "TEXT",
321341
},
322342
{
323343
Name: "b",
324344
Upstream: []UpstreamColumn{
325345
{Column: "b", Table: "table1"},
326346
},
347+
Type: "BIGINT",
327348
},
328349
{
329-
Name: "b2",
350+
Name: "c",
330351
Upstream: []UpstreamColumn{
331-
{Column: "b", Table: "table1"},
352+
{Column: "c", Table: "table2"},
332353
},
354+
Type: "TEXT",
333355
},
334356
{
335-
Name: "c",
357+
Name: "b2",
336358
Upstream: []UpstreamColumn{
337-
{Column: "c", Table: "table2"},
359+
{Column: "b", Table: "table1"},
338360
},
361+
Type: "BIGINT",
339362
},
363+
340364
{
341365
Name: "c2",
342366
Upstream: []UpstreamColumn{
343367
{Column: "c", Table: "table2"},
344368
},
369+
Type: "TEXT",
345370
},
346371
{
347372
Name: "updated_at",
348373
Upstream: []UpstreamColumn{},
374+
Type: "UNKNOWN",
349375
},
350376
},
351377
},

pythonsrc/parser/main.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ def extract_columns(parsed):
2525
for expression in found.expressions:
2626
if isinstance(expression, exp.CTE):
2727
continue
28-
29-
cols.append(expression.alias_or_name)
28+
cols.append({
29+
"name": expression.alias_or_name,
30+
"type": str(expression.type),
31+
})
3032

3133
return cols
3234

@@ -71,7 +73,7 @@ def get_column_lineage(query: str, schema: dict, dialect: str):
7173
cols = extract_columns(optimized)
7274
for col in cols:
7375
try:
74-
ll = lineage.lineage(col, optimized, schema, dialect=dialect)
76+
ll = lineage.lineage(col["name"], optimized, schema, dialect=dialect)
7577
except:
7678
continue
7779

@@ -92,7 +94,7 @@ def get_column_lineage(query: str, schema: dict, dialect: str):
9294
cl = [dict(t) for t in {tuple(d.items()) for d in cl}]
9395
cl.sort(key=lambda x: x["table"])
9496

95-
result.append({"name": col, "upstream": cl})
97+
result.append({"name": col["name"], "upstream": cl, "type": col["type"]})
9698

9799
result.sort(key=lambda x: x["name"])
98100

0 commit comments

Comments
 (0)