Skip to content

Commit

Permalink
validate bigquery query sensor fields
Browse files Browse the repository at this point in the history
  • Loading branch information
karakanb committed May 6, 2024
1 parent e7339e8 commit b896830
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/executor/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var DefaultExecutorsV2 = map[pipeline.AssetType]Config{
pipeline.AssetTypeBigqueryTableSensor: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
"bq.sensor.query": {
pipeline.AssetTypeBigqueryQuerySensor: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
},
"bq.cost_tracker": {
Expand Down
25 changes: 25 additions & 0 deletions pkg/lint/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,28 @@ func EnsureBigQueryTableSensorHasTableParameterForASingleAsset(ctx context.Conte

return issues, nil
}

func EnsureBigQueryQuerySensorHasTableParameterForASingleAsset(ctx context.Context, p *pipeline.Pipeline, asset *pipeline.Asset) ([]*Issue, error) {
issues := make([]*Issue, 0)
if asset.Type != pipeline.AssetTypeBigqueryQuerySensor {
return issues, nil
}

query, ok := asset.Parameters["query"]
if !ok {
issues = append(issues, &Issue{
Task: asset,
Description: "BigQuery query sensor requires a `query` parameter",
})
return issues, nil
}

if query == "" {
issues = append(issues, &Issue{
Task: asset,
Description: "BigQuery query sensor requires a `query` parameter that is not empty",
})
}

return issues, nil
}
78 changes: 78 additions & 0 deletions pkg/lint/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,84 @@ func TestEnsureSnowflakeSensorHasQueryParameter(t *testing.T) {
}
}

func TestEnsureBigqueryQuerySensorHasQueryParameter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
p *pipeline.Pipeline
want []string
wantErr assert.ErrorAssertionFunc
}{
{
name: "no query param",
p: &pipeline.Pipeline{
Assets: []*pipeline.Asset{
{
Name: "task1",
Type: pipeline.AssetTypeBigqueryQuerySensor,
},
},
},
wantErr: assert.NoError,
want: []string{"BigQuery query sensor requires a `query` parameter"},
},
{
name: "query param exists but empty",
p: &pipeline.Pipeline{
Assets: []*pipeline.Asset{
{
Name: "task1",
Type: pipeline.AssetTypeBigqueryQuerySensor,
Parameters: map[string]string{
"query": "",
},
},
},
},
wantErr: assert.NoError,
want: []string{"BigQuery query sensor requires a `query` parameter that is not empty"},
},
{
name: "no issues",
p: &pipeline.Pipeline{
Assets: []*pipeline.Asset{
{
Name: "task1",
Type: pipeline.AssetTypeBigqueryQuerySensor,
Parameters: map[string]string{
"query": "SELECT 1",
},
},
},
},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := CallFuncForEveryAsset(EnsureBigQueryQuerySensorHasTableParameterForASingleAsset)(tt.p)
if !tt.wantErr(t, err) {
return
}

// I am doing this because I don't care if I get a nil or empty slice
if tt.want != nil {
gotMessages := make([]string, len(got))
for i, issue := range got {
gotMessages[i] = issue.Description
}

assert.Equal(t, tt.want, gotMessages)
} else {
assert.Equal(t, []*Issue{}, got)
}
})
}
}

func TestEnsureBigQueryTableSensorHasTableParameter(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
AssetTypeSnowflakeQuerySensor = AssetType("sf.sensor.query")
AssetTypeBigqueryQuery = AssetType("bq.sql")
AssetTypeBigqueryTableSensor = AssetType("bq.sensor.table")
AssetTypeBigqueryQuerySensor = AssetType("bq.sensor.query")
AssetTypeEmpty = AssetType("empty")
AssetTypePostgresQuery = AssetType("pg.sql")
AssetTypeRedshiftQuery = AssetType("rs.sql")
Expand Down

0 comments on commit b896830

Please sign in to comment.